X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=FS%2Fbin%2Ffreeside-queued;h=36871b295f24d9216e19d52d9d79adc5e7174de8;hp=04101bbc2171a88d04d90ff468f3b8404c639561;hb=6de06472ab43534bd889e531ae060bbd4c935518;hpb=aac06e1cc16840ca746bb222a5c29280453df27e diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 04101bbc2..36871b295 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -1,130 +1,307 @@ #!/usr/bin/perl -w use strict; -use vars qw( $log_file $sigterm $sigint ); -use subs qw( _die _logmsg ); -use Fcntl qw(:flock); -use POSIX qw(setsid); -use Date::Format; +use vars qw( $DEBUG $kids $max_kids $sleep_time %kids ); +use POSIX qw(:sys_wait_h); use IO::File; -use FS::UID qw(adminsuidsetup); -use FS::Record qw(qsearchs); +use Getopt::Std; +use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); +use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); +use FS::Conf; +use FS::Record qw(qsearch); use FS::queue; +use FS::queue_depend; +use FS::queue_stat; +use FS::Log; +use FS::Cron::expire_user_pref qw( expire_user_pref ); -# no autoloading just yet -use FS::cust_main; -use FS::svc_acct; -use Net::SSH; +# no autoloading for non-FS classes... +use Net::SSH 0.07; -my $pid_file = '/var/run/freeside-queued.pid'; +$DEBUG = 0; -$SIG{CHLD} = sub { wait }; #zombie prevention +$kids = 0; + +&untaint_argv; #what it sounds like (eww) +use vars qw(%opt); +getopts('sn', \%opt ); my $user = shift or die &usage; -&daemonize; +warn "starting daemonization (forking)\n" if $DEBUG; +#daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs +daemonize1('freeside-queued'); - $sigterm = 0; - $sigint = 0; -$SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; }; -$SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; }; +warn "dropping privledges\n" if $DEBUG; +drop_root(); -$> = $FS::UID::freeside_uid unless $>; $ENV{HOME} = (getpwuid($>))[7]; #for ssh -adminsuidsetup $user; -$log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc; +warn "connecting to database\n" if $DEBUG; +$@ = 'not connected'; +while ( $@ ) { + eval { adminsuidsetup $user; }; + if ( $@ ) { + warn $@; + warn "sleeping for reconnect...\n"; + sleep 5; + } +} -$SIG{__DIE__} = \&_die; -$SIG{__WARN__} = \&_logmsg; +my $log = FS::Log->new('queue'); +logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); -warn "freesied-queued starting\n"; +warn "completing daemonization (detaching))\n" if $DEBUG; +daemonize2(); +#-- + +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; +$sleep_time = $conf->config('queued-sleep_time') || 10; + +my $warnkids=0; while (1) { - my $job = qsearchs( - 'queue', - { 'status' => 'new' }, - '', - 'ORDER BY jobnum FOR UPDATE LIMIT 1' - ) or do { - sleep 5; + &reap_kids; + #prevent runaway forking + if ( $kids >= $max_kids ) { + warn "WARNING: maximum $kids children reached\n" unless $warnkids++; + &reap_kids; + expire_user_pref() unless $warnkids % 10; + sleep 1; #waiting for signals is cheap next; - }; + } + $warnkids=0; + + unless ( dbh && dbh->ping ) { + warn "WARNING: connection to database lost, reconnecting...\n"; + + eval { $FS::UID::dbh = myconnect; }; + + unless ( !$@ && dbh && dbh->ping ) { + warn "WARNING: still no connection to database, sleeping for retry...\n"; + sleep 10; + next; + } else { + warn "WARNING: reconnected to database\n"; + } + } - my %hash = $job->hash; - $hash{'status'} = 'locked'; - my $ljob = new FS::queue ( \%hash ); - my $error = $ljob->replace($job); - die $error if $error; + #my($job, $ljob); + #{ + # my $oldAutoCommit = $FS::UID::AutoCommit; + # local $FS::UID::AutoCommit = 0; + $FS::UID::AutoCommit = 0; - my @args = $ljob->args; + my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. + ' WHERE queue_depend.jobnum = queue.jobnum )'; - #fork a child for each job (up to some maximum perhaps?) - #single-threaded for now. + #anything with a priority goes after stuff without one + my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + + my $limit = $max_kids - $kids; + + $order_by .= ( driver_name eq 'mysql' + ? " LIMIT $limit FOR UPDATE " + : " FOR UPDATE LIMIT $limit " ); + + my $hashref = { 'status' => 'new' }; + if ( $opt{'s'} ) { + $hashref->{'secure'} = 'Y'; + } elsif ( $opt{'n'} ) { + $hashref->{'secure'} = ''; + } - my $eval = "&". $ljob->job. '(@args);'; - warn "running $eval"; - eval $eval; + #qsearch dies when the db goes away + my @jobs = eval { + qsearch({ + 'table' => 'queue', + 'hashref' => $hashref, + 'extra_sql' => $nodepend, + 'order_by' => $order_by, + }); + }; if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; - $hash{'status'} = 'failed'; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); - die $error if $error; - } else { - $ljob->delete; + warn "WARNING: error searching for jobs, closing connection: $@"; + undef $FS::UID::dbh; + next; + } + + unless ( @jobs ) { + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + expire_user_pref(); + sleep $sleep_time; + next; } + foreach my $job ( @jobs ) { + + my $start_date = time; + + $log->debug('locking queue job', object => $job); + + my %hash = $job->hash; + $hash{'status'} = 'locked'; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } + + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + + $FS::UID::AutoCommit = 1; + + my @args = eval { $ljob->args; }; + if ( $@ ) { + warn "WARNING: error retrieving job arguments, closing connection: $@"; + undef $FS::UID::dbh; + next; + } + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); + + #auto-use classes... + if ( $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg|part_pkg|Cron)::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + # don't put @args in the log, may expose passwords + $log->info('starting job ('.$ljob->job.')'); + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + local $FS::UID::AutoCommit = 0; # so that we can clean up failures + do { + # switch user only if a job user is available + local $FS::CurrentUser::CurrentUser = $ljob->access_user || $FS::CurrentUser::CurrentUser; + eval $eval; #throw away return value? suppose so + }; + if ( $@ ) { + dbh->rollback; + my %hash = $ljob->hash; + $hash{'statustext'} = $@; + if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return? + $hash{'status'} = 'done'; + } else { + $hash{'status'} = 'failed'; + warn "job $eval failed"; + } + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + dbh->commit; # for the status change only + } else { + $ljob->delete; + dbh->commit; # for the job itself + } + + if ( $ljob->job eq 'FS::cust_main::queued_bill' ) { + my $queue_stat = new FS::queue_stat { + 'jobnum' => $ljob->jobnum, + 'job' => $ljob->job, + 'custnum' => $ljob->custnum, + 'insert_date' => $ljob->_date, + 'start_date' => $start_date, + 'end_date' => time, + }; + my $error = $queue_stat->insert; + die $error if $error; + dbh->commit; #for the stat + } + + if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { + open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) + or die "can't open profile file: $!"; + print PROFILE dbh->sprintProfile(); + close PROFILE or die "can't close profile file: $!"; + } + + exit; + #end-of-kid + } + + } #foreach my $job + } continue { - if ( $sigterm ) { + if ( sigterm() ) { warn "received TERM signal; exiting\n"; exit; } - if ( $sigint ) { + if ( sigint() ) { warn "received INT signal; exiting\n"; exit; } } -sub usage { - die "Usage:\n\n freeside-queued user\n"; -} - -sub _die { - my $msg = shift; - unlink $pid_file if -e $pid_file; - _logmsg($msg); +sub untaint_argv { + foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV + #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + # Date::Parse + $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + $ARGV[$_]=$1; + } } -sub _logmsg { - chomp( my $msg = shift ); - my $log = new IO::File ">>$log_file"; - flock($log, LOCK_EX); - seek($log, 0, 2); - print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n"; - flock($log, LOCK_UN); - close $log; +sub usage { + die "Usage:\n\n freeside-queued user\n"; } -sub daemonize { - - chdir "/" or die "Can't chdir to /: $!"; - open STDIN, '/dev/null' or die "Can't read /dev/null: $!"; - defined(my $pid = fork) or die "Can't fork: $!"; - if ( $pid ) { - print "freeside-queued started with pid $pid\n"; #logging to $log_file\n"; - exit unless $pid_file; - my $pidfh = new IO::File ">$pid_file" or exit; - print $pidfh "$pid\n"; - exit; +sub reap_kids { + foreach my $pid ( keys %kids ) { + my $kid = waitpid($pid, WNOHANG); + if ( $kid > 0 ) { + $kids--; + delete $kids{$kid}; + } } - open STDOUT, '>/dev/null' - or die "Can't write to /dev/null: $!"; - setsid or die "Can't start a new session: $!"; - open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; - } =head1 NAME @@ -133,13 +310,17 @@ freeside-queued - Job queue daemon =head1 SYNOPSIS - freeside-queued user + freeside-queued [ -s | -n ] user =head1 DESCRIPTION Job queue daemon. Should be running at all times. -user: from the mapsecrets file - see config.html from the base documentation +-s: "secure" jobs only (queued billing jobs) + +-n: non-"secure" jobs only (other jobs) + +user: Typically "fs_queue" =head1 VERSION