diff options
Diffstat (limited to 'FS/bin/freeside-queued')
-rw-r--r-- | FS/bin/freeside-queued | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued new file mode 100644 index 000000000..93d735d1a --- /dev/null +++ b/FS/bin/freeside-queued @@ -0,0 +1,250 @@ +#!/usr/bin/perl -w + +use strict; +use vars qw( $DEBUG $kids $max_kids %kids ); +use POSIX qw(:sys_wait_h); +use IO::File; +use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); +use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); +use FS::Record qw(qsearch qsearchs); +use FS::queue; +use FS::queue_depend; + +# no autoloading for non-FS classes... +use Net::SSH 0.07; + +$DEBUG = 0; + +$max_kids = '10'; #guess it should be a config file... +$kids = 0; + +my $user = shift or die &usage; + +warn "starting daemonization (forking)\n" if $DEBUG; +#daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs +daemonize1('freeside-queued'); + +warn "dropping privledges\n" if $DEBUG; +drop_root(); + + +$ENV{HOME} = (getpwuid($>))[7]; #for ssh + +warn "connecting to database\n" if $DEBUG; +$@ = 'not connected'; +while ( $@ ) { + eval { adminsuidsetup $user; }; + if ( $@ ) { + warn $@; + warn "sleeping for reconnect...\n"; + sleep 5; + } +} + +logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); + +warn "completing daemonization (detaching))\n" if $DEBUG; +daemonize2(); + +#-- + +my $warnkids=0; +while (1) { + + &reap_kids; + #prevent runaway forking + if ( $kids >= $max_kids ) { + warn "WARNING: maximum $kids children reached\n" unless $warnkids++; + &reap_kids; + sleep 1; #waiting for signals is cheap + next; + } + $warnkids=0; + + unless ( dbh && dbh->ping ) { + warn "WARNING: connection to database lost, reconnecting...\n"; + + eval { $FS::UID::dbh = myconnect; }; + + unless ( !$@ && dbh && dbh->ping ) { + warn "WARNING: still no connection to database, sleeping for retry...\n"; + sleep 10; + next; + } else { + warn "WARNING: reconnected to database\n"; + } + } + + #my($job, $ljob); + #{ + # my $oldAutoCommit = $FS::UID::AutoCommit; + # local $FS::UID::AutoCommit = 0; + $FS::UID::AutoCommit = 0; + + #assuming mysql 4.1 w/subqueries now + #my $nodepend = driver_name eq 'mysql' + # ? '' + # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. + # ' WHERE queue_depend.jobnum = queue.jobnum ) '; + my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. + ' WHERE queue_depend.jobnum = queue.jobnum ) '; + + my $job = qsearchs( + 'queue', + { 'status' => 'new' }, + '', + driver_name eq 'mysql' + ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE" + : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1" + ) or do { + # if $oldAutoCommit { + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + # } + sleep 5; #connecting to db is expensive + next; + }; + + #assuming mysql 4.1 w/subqueries now + #if ( driver_name eq 'mysql' + # && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { + # dbh->commit or die dbh->errstr; #if $oldAutoCommit; + # sleep 5; #would be better if mysql could do everything in query above + # next; + #} + + my %hash = $job->hash; + $hash{'status'} = 'locked'; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } + + # if $oldAutoCommit { + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + # } + + $FS::UID::AutoCommit = 1; + #} + + my @args = $ljob->args; + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + #auto-use classes... + #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { + if ( $ljob->job =~ /(FS::part_export::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + eval $eval; #throw away return value? suppose so + if ( $@ ) { + warn "job $eval failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + } else { + $ljob->delete; + } + + exit; + #end-of-kid + } + +} continue { + if ( sigterm() ) { + warn "received TERM signal; exiting\n"; + exit; + } + if ( sigint() ) { + warn "received INT signal; exiting\n"; + exit; + } +} + +sub usage { + die "Usage:\n\n freeside-queued user\n"; +} + +sub reap_kids { + foreach my $pid ( keys %kids ) { + my $kid = waitpid($pid, WNOHANG); + if ( $kid > 0 ) { + $kids--; + delete $kids{$kid}; + } + } +} + +=head1 NAME + +freeside-queued - Job queue daemon + +=head1 SYNOPSIS + + freeside-queued user + +=head1 DESCRIPTION + +Job queue daemon. Should be running at all times. + +user: from the mapsecrets file - see config.html from the base documentation + +=head1 VERSION + +=head1 BUGS + +=head1 SEE ALSO + +=cut + |