diff --git a/bin/rrd-extractstats.pl b/bin/rrd-extractstats.pl index 9a824ab..0384ce1 100755 --- a/bin/rrd-extractstats.pl +++ b/bin/rrd-extractstats.pl @@ -10,11 +10,17 @@ use warnings; use RRDs; use File::Find; use File::Find::Rule; +use DBI; +use TryCatch; +use File::Copy qw(copy); +use File::stat; use threads ;#qw( async ); use threads::shared; use Thread::Queue qw( ); +use Time::HiRes qw(time); + if ($#ARGV < 2) { die("Usage: $0 outfile [interval-hours]\n"); } @@ -34,47 +40,84 @@ read_knownlinks(); my @links = values %knownlinks; +# If the DB has it, get latest check timestamp for every ASN we are aware of +my $db_version = 1; +my $as_list; +my $db; +try { + if (-r $statsfile) { + copy($statsfile, "$statsfile.tmp"); + } + $db = DBI->connect("dbi:SQLite:dbname=$statsfile.tmp", '', ''); + + # Get last check timestamps + my $sth = $db->prepare("SELECT asn, checked_at FROM stats") or die('field missing'); + $sth->execute(); + while(my($item, $data) = $sth->fetchrow_array()) { + as_list->{$item} = $data; + } + + $db_version = 2; +} catch ($e) { + print("Previously generated database not found or checked_at field is missing, proceed with all RRD files. ($e)\n"); +} + # walk through all RRD files in the given path and extract stats for all links # from them; write the stats to an sqlite database -my @rrdfiles = File::Find::Rule->maxdepth(2)->file->in($rrdpath); +my @rrdfiles = File::Find::Rule->maxdepth(2)->file()->name('*.rrd')->in($rrdpath); $|=1; -my $i :shared = 0; my $num_workers = 1; if (($ENV{'THREADS'} =~ /^\d+$/) and ($ENV{'THREADS'} > 0)) { $num_workers = $ENV{'THREADS'}; } -print("Using " . $num_workers . " threads.\n"); my $num_work_units = scalar @rrdfiles; +print("Using " . $num_workers . " threads to process " . $num_work_units . " RRD files.\n"); + my $q = Thread::Queue->new(); my $rq = Thread::Queue->new(); # Create work foreach my $rrdfile (@rrdfiles) { if ($rrdfile =~ /\/(\d+).rrd$/) { - my $as = $1; - $q->enqueue($as); + my $task->{as} = $1; + $task->{filename} = $rrdfile; + $q->enqueue($task); } } +my $i :shared = 0; +my $skipped :shared = 0; +my $t :shared = scalar time; +my $t0 :shared = scalar time; + # Create workers my @workers; for (1..$num_workers) { push @workers, async { - while (defined(my $as = $q->dequeue())) { - my $result->{as} = $as; - $result->{result} = gettraffic($as, time - $interval, time); + while (defined(my $task = $q->dequeue())) { + if ($as_list->{$task->{as}} and (!(stat($task->{filename})->mtime > $as_list->{$task->{as}}))) { + $skipped += 1; + } else { + my $result->{as} = $task->{as}; + $result->{checked_at} = int time; + $result->{result} = gettraffic($task->{as}, int time - $interval, int time); - # Put result to result queue - $rq->enqueue($result); + # Put result to result queue + $rq->enqueue($result); + } $i++; if ($i % 100 == 0) { - print int($i / $num_work_units * 100 * 100) / 100 . "%... "; + my $average_speed = int(($i / (scalar time - $t0)) * 100) / 100; + my $current_speed = int((100 / (scalar time - $t)) * 100) / 100; + my $seconds_left = ($num_work_units - $i) / $average_speed; + printf("%.2f%% (files per sec cur/avg %.2f/%.2f, proc/skip/total %d/%d/%d, %02d:%02d:%02d left)\n", int($i / $num_work_units * 100 * 100) / 100, $current_speed, $average_speed, ($i - $skipped), $skipped, $num_work_units, $seconds_left / 3600, $seconds_left / 60 % 60, $seconds_left % 60); + $t = scalar time; } } }; @@ -86,26 +129,28 @@ $q->enqueue(undef) for @workers; # Wait for workers to end $_->join() for @workers; -print "100%"; +printf("100%% (processed %d RRD files, skipped %d because those files didn't change since last run)\n", $num_work_units - $skipped, $skipped); $rq->end(); -my $query = 'create table stats("asn" int'; -foreach my $link (@links) { - $query .= ", \"${link}_in\" int, \"${link}_out\" int, \"${link}_v6_in\" int, \"${link}_v6_out\" int"; -} -$query .= ');'; -use DBI; -my $db = DBI->connect("dbi:SQLite:dbname=$statsfile.tmp", '', ''); $db->do('PRAGMA synchronous = OFF'); -$db->do('drop table if exists stats'); -$db->do($query); +my $query; +# Recreate the table if we didn't have the checked_at column above +if ($db_version < 2) { + $db->do('DROP TABLE IF EXISTS stats;'); + + $query = 'CREATE TABLE stats("asn" INT PRIMARY KEY, "checked_at" INT'; + foreach my $link (@links) { + $query .= ", \"${link}_in\" INT, \"${link}_out\" INT, \"${link}_v6_in\" INT, \"${link}_v6_out\" INT"; + } + $query .= ');'; + $db->do($query); +} # read resultqueue and print data while (my $result = $rq->dequeue) { - my $as = $result->{as}; - $query = "insert into stats values('$as'"; + $query = "INSERT OR REPLACE INTO stats VALUES ($result->{as}, $result->{checked_at}"; foreach my $link (@links) { $query .= ", '" . undefaszero($result->{result}->{"${link}_in"}) . "'";