Add multi-threading to rrd-extractstats.pl

This commit is contained in:
Robert Lanyi 2020-03-21 15:26:53 +01:00
parent 5b14fb721f
commit 245dcd591a

View File

@ -6,10 +6,15 @@
# mod for rrd path sjc # mod for rrd path sjc
use strict; use strict;
use warnings;
use RRDs; use RRDs;
use File::Find; use File::Find;
use File::Find::Rule; use File::Find::Rule;
use threads ;#qw( async );
use threads::shared;
use Thread::Queue qw( );
if ($#ARGV < 2) { if ($#ARGV < 2) {
die("Usage: $0 <path to RRD file directory> <path to known links file> outfile [interval-hours]\n"); die("Usage: $0 <path to RRD file directory> <path to known links file> outfile [interval-hours]\n");
} }
@ -34,22 +39,52 @@ my @links = values %knownlinks;
my @rrdfiles = File::Find::Rule->maxdepth(2)->file->in($rrdpath); my @rrdfiles = File::Find::Rule->maxdepth(2)->file->in($rrdpath);
my $astraffic = {};
$|=1; $|=1;
my $i = 0; my $i :shared = 0;
my $cpus = do { local @ARGV='/proc/cpuinfo'; grep /^processor\s+:/, <>;};
my $num_workers = $cpus / 2;
my $num_work_units = scalar @rrdfiles;
my $q = Thread::Queue->new();
my $rq = Thread::Queue->new();
# Create work
foreach my $rrdfile (@rrdfiles) { foreach my $rrdfile (@rrdfiles) {
if ($rrdfile =~ /\/(\d+).rrd$/) { if ($rrdfile =~ /\/(\d+).rrd$/) {
my $as = $1; my $as = $1;
$q->enqueue($as);
$astraffic->{$as} = gettraffic($as, time - $interval, time);
$i++;
if ($i % 100 == 0) {
print "$i... ";
}
} }
} }
print "\n";
# Create workers
my @workers;
for (1..$num_workers) {
push @workers, async {
while (defined(my $as = $q->dequeue())) {
my $result->{as} = $as;
$result->{result} = gettraffic($as, time - $interval, time);
# Put result to result queue
$rq->enqueue($result);
$i++;
if ($i % 100 == 0) {
print int($i / $num_work_units * 100 * 100) / 100 . "%... ";
}
}
};
}
# Tell workers they are no longer needed.
$q->enqueue(undef) for @workers;
# Wait for workers to end
$_->join() for @workers;
print "100%";
$rq->end();
my $query = 'create table stats("asn" int'; my $query = 'create table stats("asn" int';
foreach my $link (@links) { foreach my $link (@links) {
@ -63,16 +98,16 @@ $db->do('PRAGMA synchronous = OFF');
$db->do('drop table if exists stats'); $db->do('drop table if exists stats');
$db->do($query); $db->do($query);
# print data # read resultqueue and print data
foreach my $as (keys %{ $astraffic }) { while (my $result = $rq->dequeue) {
my $as = $result->{as};
$query = "insert into stats values('$as'"; $query = "insert into stats values('$as'";
foreach my $link (@links) { foreach my $link (@links) {
$query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_in"}) . "'"; $query .= ", '" . undefaszero($result->{result}->{"${link}_in"}) . "'";
$query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_out"}) . "'"; $query .= ", '" . undefaszero($result->{result}->{"${link}_out"}) . "'";
$query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_v6_in"}) . "'"; $query .= ", '" . undefaszero($result->{result}->{"${link}_v6_in"}) . "'";
$query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_v6_out"}) . "'"; $query .= ", '" . undefaszero($result->{result}->{"${link}_v6_out"}) . "'";
} }
$query .= ');'; $query .= ');';
$db->do($query); $db->do($query);