#!/usr/bin/perl -w use strict; use vars qw( $DEBUG $kids $max_kids %kids ); use POSIX qw(:sys_wait_h); use IO::File; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); use FS::Record qw(qsearch qsearchs); use FS::queue; use FS::queue_depend; # no autoloading just yet use FS::cust_main; use FS::svc_acct; use Net::SSH 0.07; use FS::part_export; $DEBUG = 0; $max_kids = '10'; #guess it should be a config file... $kids = 0; my $user = shift or die &usage; warn "starting daemonization (forking)\n" if $DEBUG; #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; $@ = 'not connected'; while ( $@ ) { eval { adminsuidsetup $user; }; if ( $@ ) { warn $@; warn "sleeping for reconnect...\n"; sleep 5; } } logfile( "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc ); warn "completing daemonization (detaching))\n" if $DEBUG; daemonize2(); #-- my $warnkids=0; while (1) { &reap_kids; #prevent runaway forking if ( $kids >= $max_kids ) { warn "WARNING: maximum $kids children reached\n" unless $warnkids++; &reap_kids; sleep 1; #waiting for signals is cheap next; } $warnkids=0; unless ( dbh && dbh->ping ) { warn "WARNING: connection to database lost, reconnecting...\n"; eval { $FS::UID::dbh = myconnect; }; unless ( !$@ && dbh && dbh->ping ) { warn "WARNING: still no connection to database, sleeping for retry...\n"; sleep 10; next; } else { warn "WARNING: reconnected to database\n"; } } #my($job, $ljob); #{ # my $oldAutoCommit = $FS::UID::AutoCommit; # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; #assuming mysql 4.1 w/subqueries now #my $nodepend = driver_name eq 'mysql' # ? '' # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. # ' WHERE queue_depend.jobnum = queue.jobnum ) '; my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. ' WHERE queue_depend.jobnum = queue.jobnum ) '; my $job = qsearchs( 'queue', { 'status' => 'new' }, '', driver_name eq 'mysql' ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE" : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1" ) or do { # if $oldAutoCommit { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; # } sleep 5; #connecting to db is expensive next; }; #assuming mysql 4.1 w/subqueries now #if ( driver_name eq 'mysql' # && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { # dbh->commit or die dbh->errstr; #if $oldAutoCommit; # sleep 5; #would be better if mysql could do everything in query above # next; #} my %hash = $job->hash; $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); if ( $error ) { warn "WARNING: database error locking job, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; } # if $oldAutoCommit { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; # } $FS::UID::AutoCommit = 1; #} my @args = $ljob->args; splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; defined( my $pid = fork ) or do { warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = "[freeside-queued] can't fork: $!"; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); die $error if $error; next; #don't increment the kid counter }; if ( $pid ) { $kids++; $kids{$pid} = 1; } else { #kid time #get new db handle $FS::UID::dbh->{InactiveDestroy} = 1; forksuidsetup($user); #auto-use classes... #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { if ( $ljob->job =~ /(FS::part_export::\w+)::/ || $ljob->job =~ /(FS::\w+)::/ ) { my $class = $1; eval "use $class;"; if ( $@ ) { warn "job use $class failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; exit; #end-of-kid }; } my $eval = "&". $ljob->job. '(@args);'; warn "running $eval"; eval $eval; #throw away return value? suppose so if ( $@ ) { warn "job $eval failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; } else { $ljob->delete; } exit; #end-of-kid } } continue { if ( sigterm() ) { warn "received TERM signal; exiting\n"; exit; } if ( sigint() ) { warn "received INT signal; exiting\n"; exit; } } sub usage { die "Usage:\n\n freeside-queued user\n"; } sub reap_kids { foreach my $pid ( keys %kids ) { my $kid = waitpid($pid, WNOHANG); if ( $kid > 0 ) { $kids--; delete $kids{$kid}; } } } =head1 NAME freeside-queued - Job queue daemon =head1 SYNOPSIS freeside-queued user =head1 DESCRIPTION Job queue daemon. Should be running at all times. user: from the mapsecrets file - see config.html from the base documentation =head1 VERSION =head1 BUGS =head1 SEE ALSO =cut