mirror of
https://github.com/manuelkasper/AS-Stats.git
synced 2025-02-20 11:44:12 +08:00
Merge pull request #87 from rlanyi/extractstats-skipifnotmodified
Skip analyzing traffic data if RRD file hasn't changed since last run
This commit is contained in:
commit
ab8a48e33e
@ -10,11 +10,17 @@ use warnings;
|
|||||||
use RRDs;
|
use RRDs;
|
||||||
use File::Find;
|
use File::Find;
|
||||||
use File::Find::Rule;
|
use File::Find::Rule;
|
||||||
|
use DBI;
|
||||||
|
use TryCatch;
|
||||||
|
use File::Copy qw(copy);
|
||||||
|
use File::stat;
|
||||||
|
|
||||||
use threads ;#qw( async );
|
use threads ;#qw( async );
|
||||||
use threads::shared;
|
use threads::shared;
|
||||||
use Thread::Queue qw( );
|
use Thread::Queue qw( );
|
||||||
|
|
||||||
|
use Time::HiRes qw(time);
|
||||||
|
|
||||||
if ($#ARGV < 2) {
|
if ($#ARGV < 2) {
|
||||||
die("Usage: $0 <path to RRD file directory> <path to known links file> outfile [interval-hours]\n");
|
die("Usage: $0 <path to RRD file directory> <path to known links file> outfile [interval-hours]\n");
|
||||||
}
|
}
|
||||||
@ -34,47 +40,84 @@ read_knownlinks();
|
|||||||
|
|
||||||
my @links = values %knownlinks;
|
my @links = values %knownlinks;
|
||||||
|
|
||||||
|
# If the DB has it, get latest check timestamp for every ASN we are aware of
|
||||||
|
my $db_version = 1;
|
||||||
|
my $as_list;
|
||||||
|
my $db;
|
||||||
|
try {
|
||||||
|
if (-r $statsfile) {
|
||||||
|
copy($statsfile, "$statsfile.tmp");
|
||||||
|
}
|
||||||
|
$db = DBI->connect("dbi:SQLite:dbname=$statsfile.tmp", '', '');
|
||||||
|
|
||||||
|
# Get last check timestamps
|
||||||
|
my $sth = $db->prepare("SELECT asn, checked_at FROM stats") or die('field missing');
|
||||||
|
$sth->execute();
|
||||||
|
while(my($item, $data) = $sth->fetchrow_array()) {
|
||||||
|
as_list->{$item} = $data;
|
||||||
|
}
|
||||||
|
|
||||||
|
$db_version = 2;
|
||||||
|
} catch ($e) {
|
||||||
|
print("Previously generated database not found or checked_at field is missing, proceed with all RRD files. ($e)\n");
|
||||||
|
}
|
||||||
|
|
||||||
# walk through all RRD files in the given path and extract stats for all links
|
# walk through all RRD files in the given path and extract stats for all links
|
||||||
# from them; write the stats to an sqlite database
|
# from them; write the stats to an sqlite database
|
||||||
|
|
||||||
my @rrdfiles = File::Find::Rule->maxdepth(2)->file->in($rrdpath);
|
my @rrdfiles = File::Find::Rule->maxdepth(2)->file()->name('*.rrd')->in($rrdpath);
|
||||||
|
|
||||||
$|=1;
|
$|=1;
|
||||||
my $i :shared = 0;
|
|
||||||
|
|
||||||
my $num_workers = 1;
|
my $num_workers = 1;
|
||||||
if (($ENV{'THREADS'}) and ($ENV{'THREADS'} =~ /^\d+$/) and ($ENV{'THREADS'} > 0)) {
|
if (($ENV{'THREADS'}) and ($ENV{'THREADS'} =~ /^\d+$/) and ($ENV{'THREADS'} > 0)) {
|
||||||
$num_workers = $ENV{'THREADS'};
|
$num_workers = $ENV{'THREADS'};
|
||||||
}
|
}
|
||||||
print("Using " . $num_workers . " threads.\n");
|
|
||||||
|
|
||||||
my $num_work_units = scalar @rrdfiles;
|
my $num_work_units = scalar @rrdfiles;
|
||||||
|
|
||||||
|
print("Using " . $num_workers . " threads to process " . $num_work_units . " RRD files.\n");
|
||||||
|
|
||||||
my $q = Thread::Queue->new();
|
my $q = Thread::Queue->new();
|
||||||
my $rq = Thread::Queue->new();
|
my $rq = Thread::Queue->new();
|
||||||
|
|
||||||
# Create work
|
# Create work
|
||||||
foreach my $rrdfile (@rrdfiles) {
|
foreach my $rrdfile (@rrdfiles) {
|
||||||
if ($rrdfile =~ /\/(\d+).rrd$/) {
|
if ($rrdfile =~ /\/(\d+).rrd$/) {
|
||||||
my $as = $1;
|
my $task->{as} = $1;
|
||||||
$q->enqueue($as);
|
$task->{filename} = $rrdfile;
|
||||||
|
$q->enqueue($task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
my $i :shared = 0;
|
||||||
|
my $skipped :shared = 0;
|
||||||
|
my $t :shared = scalar time;
|
||||||
|
my $t0 :shared = scalar time;
|
||||||
|
|
||||||
# Create workers
|
# Create workers
|
||||||
my @workers;
|
my @workers;
|
||||||
for (1..$num_workers) {
|
for (1..$num_workers) {
|
||||||
push @workers, async {
|
push @workers, async {
|
||||||
while (defined(my $as = $q->dequeue())) {
|
while (defined(my $task = $q->dequeue())) {
|
||||||
my $result->{as} = $as;
|
if ($as_list->{$task->{as}} and (!(stat($task->{filename})->mtime > $as_list->{$task->{as}}))) {
|
||||||
$result->{result} = gettraffic($as, time - $interval, time);
|
$skipped += 1;
|
||||||
|
} else {
|
||||||
|
my $result->{as} = $task->{as};
|
||||||
|
$result->{checked_at} = int time;
|
||||||
|
$result->{result} = gettraffic($task->{as}, int time - $interval, int time);
|
||||||
|
|
||||||
# Put result to result queue
|
# Put result to result queue
|
||||||
$rq->enqueue($result);
|
$rq->enqueue($result);
|
||||||
|
}
|
||||||
|
|
||||||
$i++;
|
$i++;
|
||||||
if ($i % 100 == 0) {
|
if ($i % 100 == 0) {
|
||||||
print int($i / $num_work_units * 100 * 100) / 100 . "%... ";
|
my $average_speed = int(($i / (scalar time - $t0)) * 100) / 100;
|
||||||
|
my $current_speed = int((100 / (scalar time - $t)) * 100) / 100;
|
||||||
|
my $seconds_left = ($num_work_units - $i) / $average_speed;
|
||||||
|
printf("%.2f%% (files per sec cur/avg %.2f/%.2f, proc/skip/total %d/%d/%d, %02d:%02d:%02d left)\n", int($i / $num_work_units * 100 * 100) / 100, $current_speed, $average_speed, ($i - $skipped), $skipped, $num_work_units, $seconds_left / 3600, $seconds_left / 60 % 60, $seconds_left % 60);
|
||||||
|
$t = scalar time;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -86,26 +129,28 @@ $q->enqueue(undef) for @workers;
|
|||||||
# Wait for workers to end
|
# Wait for workers to end
|
||||||
$_->join() for @workers;
|
$_->join() for @workers;
|
||||||
|
|
||||||
print "100%";
|
printf("100%% (processed %d RRD files, skipped %d because those files didn't change since last run)\n", $num_work_units - $skipped, $skipped);
|
||||||
|
|
||||||
$rq->end();
|
$rq->end();
|
||||||
|
|
||||||
my $query = 'create table stats("asn" int';
|
|
||||||
foreach my $link (@links) {
|
|
||||||
$query .= ", \"${link}_in\" int, \"${link}_out\" int, \"${link}_v6_in\" int, \"${link}_v6_out\" int";
|
|
||||||
}
|
|
||||||
$query .= ');';
|
|
||||||
|
|
||||||
use DBI;
|
|
||||||
my $db = DBI->connect("dbi:SQLite:dbname=$statsfile.tmp", '', '');
|
|
||||||
$db->do('PRAGMA synchronous = OFF');
|
$db->do('PRAGMA synchronous = OFF');
|
||||||
$db->do('drop table if exists stats');
|
my $query;
|
||||||
$db->do($query);
|
# Recreate the table if we didn't have the checked_at column above
|
||||||
|
if ($db_version < 2) {
|
||||||
|
$db->do('DROP TABLE IF EXISTS stats;');
|
||||||
|
|
||||||
|
$query = 'CREATE TABLE stats("asn" INT PRIMARY KEY, "checked_at" INT';
|
||||||
|
foreach my $link (@links) {
|
||||||
|
$query .= ", \"${link}_in\" INT, \"${link}_out\" INT, \"${link}_v6_in\" INT, \"${link}_v6_out\" INT";
|
||||||
|
}
|
||||||
|
$query .= ');';
|
||||||
|
$db->do($query);
|
||||||
|
}
|
||||||
|
|
||||||
# read resultqueue and print data
|
# read resultqueue and print data
|
||||||
while (my $result = $rq->dequeue) {
|
while (my $result = $rq->dequeue) {
|
||||||
my $as = $result->{as};
|
$query = "INSERT OR REPLACE INTO stats VALUES ($result->{as}, $result->{checked_at}";
|
||||||
$query = "insert into stats values('$as'";
|
|
||||||
|
|
||||||
foreach my $link (@links) {
|
foreach my $link (@links) {
|
||||||
$query .= ", '" . undefaszero($result->{result}->{"${link}_in"}) . "'";
|
$query .= ", '" . undefaszero($result->{result}->{"${link}_in"}) . "'";
|
||||||
|
Loading…
x
Reference in New Issue
Block a user