From 245dcd591aca822740eabe827a11006524dec426 Mon Sep 17 00:00:00 2001 From: Robert Lanyi Date: Sat, 21 Mar 2020 15:26:53 +0100 Subject: [PATCH] Add multi-threading to rrd-extractstats.pl --- bin/rrd-extractstats.pl | 71 ++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 18 deletions(-) diff --git a/bin/rrd-extractstats.pl b/bin/rrd-extractstats.pl index ef9f378..685f625 100755 --- a/bin/rrd-extractstats.pl +++ b/bin/rrd-extractstats.pl @@ -6,10 +6,15 @@ # mod for rrd path sjc use strict; +use warnings; use RRDs; use File::Find; use File::Find::Rule; +use threads ;#qw( async ); +use threads::shared; +use Thread::Queue qw( ); + if ($#ARGV < 2) { die("Usage: $0 outfile [interval-hours]\n"); } @@ -34,22 +39,52 @@ my @links = values %knownlinks; my @rrdfiles = File::Find::Rule->maxdepth(2)->file->in($rrdpath); -my $astraffic = {}; - $|=1; -my $i = 0; +my $i :shared = 0; + +my $cpus = do { local @ARGV='/proc/cpuinfo'; grep /^processor\s+:/, <>;}; +my $num_workers = $cpus / 2; +my $num_work_units = scalar @rrdfiles; + +my $q = Thread::Queue->new(); +my $rq = Thread::Queue->new(); + +# Create work foreach my $rrdfile (@rrdfiles) { if ($rrdfile =~ /\/(\d+).rrd$/) { my $as = $1; - - $astraffic->{$as} = gettraffic($as, time - $interval, time); - $i++; - if ($i % 100 == 0) { - print "$i... "; - } + $q->enqueue($as); } } -print "\n"; + +# Create workers +my @workers; +for (1..$num_workers) { + push @workers, async { + while (defined(my $as = $q->dequeue())) { + my $result->{as} = $as; + $result->{result} = gettraffic($as, time - $interval, time); + + # Put result to result queue + $rq->enqueue($result); + + $i++; + if ($i % 100 == 0) { + print int($i / $num_work_units * 100 * 100) / 100 . "%... "; + } + } + }; +} + +# Tell workers they are no longer needed. +$q->enqueue(undef) for @workers; + +# Wait for workers to end +$_->join() for @workers; + +print "100%"; + +$rq->end(); my $query = 'create table stats("asn" int'; foreach my $link (@links) { @@ -63,16 +98,16 @@ $db->do('PRAGMA synchronous = OFF'); $db->do('drop table if exists stats'); $db->do($query); -# print data -foreach my $as (keys %{ $astraffic }) { - +# read resultqueue and print data +while (my $result = $rq->dequeue) { + my $as = $result->{as}; $query = "insert into stats values('$as'"; - + foreach my $link (@links) { - $query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_in"}) . "'"; - $query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_out"}) . "'"; - $query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_v6_in"}) . "'"; - $query .= ", '" . undefaszero($astraffic->{$as}->{"${link}_v6_out"}) . "'"; + $query .= ", '" . undefaszero($result->{result}->{"${link}_in"}) . "'"; + $query .= ", '" . undefaszero($result->{result}->{"${link}_out"}) . "'"; + $query .= ", '" . undefaszero($result->{result}->{"${link}_v6_in"}) . "'"; + $query .= ", '" . undefaszero($result->{result}->{"${link}_v6_out"}) . "'"; } $query .= ');'; $db->do($query);