#!/usr/bin/perl -w use strict; use POSIX qw( :sys_wait_h ); use Sys::SigAction qw( set_sig_handler ); use Date::Parse; use Date::Format; use FS::Daemon ':all'; #daemonize1 drop_root daemonize2 myexit logfile sig* use FS::UID qw( adminsuidsetup forksuidsetup dbh driver_name ); use FS::Record qw( qsearch str2time_sql str2time_sql_closing concat_sql ); use FS::torrus_srvderive; our $DEBUG = 2; our $max_kids = 6; our $search_timeout = 30*60; # 15*60 5*60; #$torrus_srvderive->last_srv_date ? 5*60 : 15*60); our %kids; my $user = shift or die &usage; $FS::Daemon::PID_NEWSTYLE = 1; daemonize1('torrus-srvderive'); drop_root(); adminsuidsetup($user); logfile( "%%%FREESIDE_LOG%%%/torrus-srvderive-log.". $FS::UID::datasrc ); daemonize2(); our $conf = new FS::Conf; die "not running: network_monitoring_system not Torrus_Internal\n" unless _shouldrun(); #-- my $str2time = str2time_sql(); my $c = str2time_sql_closing(); my $_date = concat_sql([ 'srvexport.srv_date', "' '", 'srvexport.srv_time' ]); $_date = "CAST( $_date AS TIMESTAMP )" if driver_name =~ /^Pg/i; $_date = str2time_sql. $_date. str2time_sql_closing; my $other_date = concat_sql([ 'other.srv_date', "' '", 'other.srv_time' ]); $other_date = "CAST( $other_date AS TIMESTAMP )" if driver_name =~ /^Pg/i; $other_date = str2time_sql. $other_date. str2time_sql_closing; my $in = concat_sql([ '?', "'_IN'" ]); my $out = concat_sql([ '?', "'_OUT'" ]); my $sql = " SELECT DISTINCT srv_date, srv_time FROM srvexport WHERE NOT EXISTS ( SELECT 1 FROM srvexport AS other WHERE other.serviceid IN ( $in, $out ) AND srvexport.srv_date = other.srv_date AND ABS( $_date - $other_date ) <= 60 ) "; my $orderlimit = " ORDER BY srv_date, srv_time LIMIT 100 "; #50? our $kids = 0; #MAIN: while (1) { while (1) { my $found = 0; #SERVICEID: foreach my $torrus_srvderive ( qsearch('torrus_srvderive', {}) ) { foreach my $torrus_srvderive ( qsearch('torrus_srvderive', {}) ) { &reap_kids; if ( $kids >= $max_kids ) { sleep 5; myexit() if sigterm() || sigint(); redo; } defined( my $pid = fork ) or do { #warn "WARNING: can't fork: $!\n"; #next; #don't increment the kid counter die "can't fork: $!\n"; }; if ( $pid ) { $kids++; $kids{$pid} = 1; } else { #kid time #get new db handle $FS::UID::dbh->{InactiveDestroy} = 1; forksuidsetup($user); my $serviceid = $torrus_srvderive->serviceid; my @serviceids = $torrus_srvderive->component_serviceids; exit unless @serviceids; #don't try to search for empty virtual ports #nonlocking select statements; rows in this table never change if ( driver_name eq 'mysql' ) { dbh->do('SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED'); dbh->commit or die dbh->errstr; } my @in = (); for my $dir ('_IN', '_OUT') { push @in, map dbh->quote("$_$dir"), @serviceids; } my $in = join(',', @in); if ( ! $torrus_srvderive->last_srv_date ) { warn "finding initial last_srv_date for $serviceid\n" if $DEBUG; my $dsql = "SELECT srv_date FROM srvexport WHERE serviceid IN ($in) ORDER BY srv_date LIMIT 1"; my $dsth = dbh->prepare($dsql) or die $DBI::errstr; $dsth->execute or die $dsth->errstr; my $date = $dsth->fetchrow_arrayref->[0]; if ( $date ) { warn "found initial last_srv_date of $date; updating $serviceid\n" if $DEBUG; $torrus_srvderive->last_srv_date($date); my $error = $torrus_srvderive->replace; die $error if $error; } else { warn "no initial last_srv_date for $serviceid; skipping\n" if $DEBUG; exit; } } my $ssql = " $sql AND EXISTS ( SELECT 1 FROM srvexport AS other WHERE other.serviceid IN ($in) AND srvexport.srv_date = other.srv_date AND ABS( $_date - $other_date ) <= 60 ) "; $ssql .= " AND srv_date >= '". $torrus_srvderive->last_srv_date. "' " if $torrus_srvderive->last_srv_date; $ssql .= $orderlimit; warn "searching for times to add $serviceid\n" if $DEBUG; warn $ssql if $DEBUG > 2; my $sth = dbh->prepare($ssql) or die $DBI::errstr; #better recovery here? eval { my $h = set_sig_handler( 'ALRM', sub { die "_timeout\n"; } ); alarm($search_timeout); $sth->execute($serviceid, $serviceid) or die $sth->errstr; alarm(0); }; alarm(0); if ( $@ && $@ eq "_timeout\n" ) { #warn "search timed out; reconnecting and restarting\n"; warn "search timed out\n"; dbh->clone()->do("KILL QUERY ". dbh->{"mysql_thread_id"}) if driver_name eq 'mysql'; dbh->rollback; #or die dbh->errstr; #adminsuidsetup($user); #next SERVICEID; #MAIN; exit; } elsif ( $@ ) { die $@; } warn "search for $serviceid finished; checking results\n" if $DEBUG; my $prev = 0; while ( my $row = $sth->fetchrow_arrayref ) { last if sigterm() || sigint(); my( $srv_date, $srv_time ) = @$row; my $cur = str2time( "$srv_date $srv_time" ); next if $cur-$prev <= 60; last if time - $cur <= 300; warn "no $serviceid for $srv_date $srv_time; adding\n" if $DEBUG; $found++; for my $dir ('_IN', '_OUT') { my $sin = join(',', map dbh->quote("$_$dir"), @serviceids); my $sum = " SELECT COALESCE(SUM(value),0) FROM srvexport AS other WHERE other.serviceid IN ($sin) AND ABS( $cur - $other_date ) <= 60 "; my $isql = " INSERT INTO srvexport ( srv_date, srv_time, serviceid, value, intvl ) VALUES ( ?, ?, ?, ($sum), ? ) "; my @param = ( time2str('%Y-%m-%d', $cur), #srv_date time2str('%X', $cur), #srv_time "$serviceid$dir", 300, #intvl ... ); warn $isql. ' with param '. join(',',@param). "\n" if $DEBUG > 2; my $isth = dbh->prepare($isql) or die $DBI::errstr; #better recovery? #stupid mysql deadlocks all the time on insert, so we need to recover unless ( $isth->execute(@param) ) { #warn "Error inserting data for $serviceid$dir (restarting): ". # $isth->errstr; warn "Error inserting data for $serviceid$dir: ". $isth->errstr; dbh->rollback; #or die dbh->errstr; #sleep 5; #next SERVICEID; #MAIN; exit; } } if ( $srv_date ne $torrus_srvderive->last_srv_date ) { warn "updating last_srv_date of $serviceid to $srv_date\n" if $DEBUG; $torrus_srvderive->last_srv_date($srv_date); my $error = $torrus_srvderive->replace; die $error if $error; } dbh->commit or die dbh->errstr; $prev = $cur; } warn "done with $serviceid\n" if $DEBUG; exit; #end-of-kid } } #foreach my $torrus_srvderive dbh->commit or die dbh->errstr; myexit() if sigterm() || sigint(); warn "restarting main loop\n" if $DEBUG > 1; #sleep 60 unless $found; } sub _shouldrun { $conf->exists('network_monitoring_system') && $conf->config('network_monitoring_system') eq 'Torrus_Internal'; } sub usage { die "Usage:\n\n freeside-cdrrewrited user\n"; } sub reap_kids { foreach my $pid ( keys %kids ) { my $kid = waitpid($pid, WNOHANG); if ( $kid > 0 ) { $kids--; delete $kids{$kid}; } } } =head1 NAME freeside-torrus-srvderive - Freeside's Torrus virtual port daemon. =head1 SYNOPSIS freeside-torrus-srvderive =head1 DESCRIPTION Runs continuously, searches for samples in the srvexport table which do not have an entry for combined virtual ports, and adds them. =head1 SEE ALSO =cut 1;