#!/usr/bin/perl -w use strict; use vars qw( $DEBUG $kids $max_kids $sleep_time %kids ); use POSIX qw(:sys_wait_h); use IO::File; use Getopt::Std; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); use FS::Conf; use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; use FS::queue_stat; use FS::Log; use FS::Cron::expire_user_pref qw( expire_user_pref ); # no autoloading for non-FS classes... use Net::SSH 0.07; $DEBUG = 0; $kids = 0; &untaint_argv; #what it sounds like (eww) use vars qw(%opt); getopts('sn', \%opt ); my $user = shift or die &usage; warn "starting daemonization (forking)\n" if $DEBUG; #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; $@ = 'not connected'; while ( $@ ) { eval { adminsuidsetup $user; }; if ( $@ ) { warn $@; warn "sleeping for reconnect...\n"; sleep 5; } } my $log = FS::Log->new('queue'); logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); warn "completing daemonization (detaching))\n" if $DEBUG; daemonize2(); #-- my $conf = new FS::Conf; $max_kids = $conf->config('queued-max_kids') || 10; $sleep_time = $conf->config('queued-sleep_time') || 10; my $warnkids=0; while (1) { &reap_kids; #prevent runaway forking if ( $kids >= $max_kids ) { warn "WARNING: maximum $kids children reached\n" unless $warnkids++; &reap_kids; expire_user_pref() unless $warnkids % 10; sleep 1; #waiting for signals is cheap next; } $warnkids=0; unless ( dbh && dbh->ping ) { warn "WARNING: connection to database lost, reconnecting...\n"; eval { $FS::UID::dbh = myconnect; }; unless ( !$@ && dbh && dbh->ping ) { warn "WARNING: still no connection to database, sleeping for retry...\n"; sleep 10; next; } else { warn "WARNING: reconnected to database\n"; } } #my($job, $ljob); #{ # my $oldAutoCommit = $FS::UID::AutoCommit; # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. ' WHERE queue_depend.jobnum = queue.jobnum )'; #anything with a priority goes after stuff without one my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; my $limit = $max_kids - $kids; $order_by .= ( driver_name eq 'mysql' ? " LIMIT $limit FOR UPDATE " : " FOR UPDATE LIMIT $limit " ); my $hashref = { 'status' => 'new' }; if ( $opt{'s'} ) { $hashref->{'secure'} = 'Y'; } elsif ( $opt{'n'} ) { $hashref->{'secure'} = ''; } #qsearch dies when the db goes away my @jobs = eval { qsearch({ 'table' => 'queue', 'hashref' => $hashref, 'extra_sql' => $nodepend, 'order_by' => $order_by, }); }; if ( $@ ) { warn "WARNING: error searching for jobs, closing connection: $@"; undef $FS::UID::dbh; next; } unless ( @jobs ) { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; expire_user_pref(); sleep $sleep_time; next; } foreach my $job ( @jobs ) { my $start_date = time; $log->debug('locking queue job', object => $job); my %hash = $job->hash; $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); if ( $error ) { warn "WARNING: database error locking job, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; } dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; $FS::UID::AutoCommit = 1; my @args = eval { $ljob->args; }; if ( $@ ) { warn "WARNING: error retrieving job arguments, closing connection: $@"; undef $FS::UID::dbh; next; } splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; defined( my $pid = fork ) or do { warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = "[freeside-queued] can't fork: $!"; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db next; #don't increment the kid counter }; if ( $pid ) { $kids++; $kids{$pid} = 1; } else { #kid time #get new db handle $FS::UID::dbh->{InactiveDestroy} = 1; forksuidsetup($user); dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); #auto-use classes... if ( $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg|part_pkg|Cron)::\w+)::/ || $ljob->job =~ /(FS::\w+)::/ ) { my $class = $1; eval "use $class;"; if ( $@ ) { warn "job use $class failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; exit; #end-of-kid }; } my $eval = "&". $ljob->job. '(@args);'; # don't put @args in the log, may expose passwords $log->info('starting job ('.$ljob->job.')'); warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; local $FS::UID::AutoCommit = 0; # so that we can clean up failures do { # switch user only if a job user is available local $FS::CurrentUser::CurrentUser = $ljob->access_user || $FS::CurrentUser::CurrentUser; eval $eval; #throw away return value? suppose so }; if ( $@ ) { dbh->rollback; my %hash = $ljob->hash; $hash{'statustext'} = $@; if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return? $hash{'status'} = 'done'; } else { $hash{'status'} = 'failed'; warn "job $eval failed"; } my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; dbh->commit; # for the status change only } else { $ljob->delete; dbh->commit; # for the job itself } if ( $ljob->job eq 'FS::cust_main::queued_bill' ) { my $queue_stat = new FS::queue_stat { 'jobnum' => $ljob->jobnum, 'job' => $ljob->job, 'custnum' => $ljob->custnum, 'insert_date' => $ljob->_date, 'start_date' => $start_date, 'end_date' => time, }; my $error = $queue_stat->insert; die $error if $error; dbh->commit; #for the stat } if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) or die "can't open profile file: $!"; print PROFILE dbh->sprintProfile(); close PROFILE or die "can't close profile file: $!"; } exit; #end-of-kid } } #foreach my $job } continue { if ( sigterm() ) { warn "received TERM signal; exiting\n"; exit; } if ( sigint() ) { warn "received INT signal; exiting\n"; exit; } } sub untaint_argv { foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; # Date::Parse $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; $ARGV[$_]=$1; } } sub usage { die "Usage:\n\n freeside-queued user\n"; } sub reap_kids { foreach my $pid ( keys %kids ) { my $kid = waitpid($pid, WNOHANG); if ( $kid > 0 ) { $kids--; delete $kids{$kid}; } } } =head1 NAME freeside-queued - Job queue daemon =head1 SYNOPSIS freeside-queued [ -s | -n ] user =head1 DESCRIPTION Job queue daemon. Should be running at all times. -s: "secure" jobs only (queued billing jobs) -n: non-"secure" jobs only (other jobs) user: Typically "fs_queue" =head1 VERSION =head1 BUGS =head1 SEE ALSO =cut