X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=FS%2Fbin%2Ffreeside-queued;h=36871b295f24d9216e19d52d9d79adc5e7174de8;hp=22fd7bb5ee35e7af312b20b92a5a87c960f39abe;hb=bb7e827141c9ed68f30765c9ca2ddcd1d760ad2d;hpb=70d0d44dff39cb9235cbbec1918c4ea95f0dc4c4 diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 22fd7bb5e..36871b295 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -1,15 +1,19 @@ #!/usr/bin/perl -w use strict; -use vars qw( $DEBUG $kids $max_kids %kids ); +use vars qw( $DEBUG $kids $max_kids $sleep_time %kids ); use POSIX qw(:sys_wait_h); use IO::File; +use Getopt::Std; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); use FS::Conf; use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; +use FS::queue_stat; +use FS::Log; +use FS::Cron::expire_user_pref qw( expire_user_pref ); # no autoloading for non-FS classes... use Net::SSH 0.07; @@ -18,6 +22,10 @@ $DEBUG = 0; $kids = 0; +&untaint_argv; #what it sounds like (eww) +use vars qw(%opt); +getopts('sn', \%opt ); + my $user = shift or die &usage; warn "starting daemonization (forking)\n" if $DEBUG; @@ -40,6 +48,7 @@ while ( $@ ) { } } +my $log = FS::Log->new('queue'); logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); warn "completing daemonization (detaching))\n" if $DEBUG; @@ -49,6 +58,7 @@ daemonize2(); my $conf = new FS::Conf; $max_kids = $conf->config('queued-max_kids') || 10; +$sleep_time = $conf->config('queued-sleep_time') || 10; my $warnkids=0; while (1) { @@ -58,6 +68,7 @@ while (1) { if ( $kids >= $max_kids ) { warn "WARNING: maximum $kids children reached\n" unless $warnkids++; &reap_kids; + expire_user_pref() unless $warnkids % 10; sleep 1; #waiting for signals is cheap next; } @@ -95,12 +106,27 @@ while (1) { ? " LIMIT $limit FOR UPDATE " : " FOR UPDATE LIMIT $limit " ); - my @jobs = qsearch({ - 'table' => 'queue', - 'hashref' => { 'status' => 'new' }, - 'extra_sql' => $nodepend, - 'order_by' => $order_by, - }); + my $hashref = { 'status' => 'new' }; + if ( $opt{'s'} ) { + $hashref->{'secure'} = 'Y'; + } elsif ( $opt{'n'} ) { + $hashref->{'secure'} = ''; + } + + #qsearch dies when the db goes away + my @jobs = eval { + qsearch({ + 'table' => 'queue', + 'hashref' => $hashref, + 'extra_sql' => $nodepend, + 'order_by' => $order_by, + }); + }; + if ( $@ ) { + warn "WARNING: error searching for jobs, closing connection: $@"; + undef $FS::UID::dbh; + next; + } unless ( @jobs ) { dbh->commit or do { @@ -108,12 +134,17 @@ while (1) { undef $FS::UID::dbh; next; }; - sleep 1; + expire_user_pref(); + sleep $sleep_time; next; } foreach my $job ( @jobs ) { + my $start_date = time; + + $log->debug('locking queue job', object => $job); + my %hash = $job->hash; $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); @@ -133,7 +164,12 @@ while (1) { $FS::UID::AutoCommit = 1; - my @args = $ljob->args; + my @args = eval { $ljob->args; }; + if ( $@ ) { + warn "WARNING: error retrieving job arguments, closing connection: $@"; + undef $FS::UID::dbh; + next; + } splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; defined( my $pid = fork ) or do { @@ -143,7 +179,7 @@ while (1) { $hash{'statustext'} = "[freeside-queued] can't fork: $!"; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); - die $error if $error; + die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db next; #don't increment the kid counter }; @@ -160,7 +196,7 @@ while (1) { dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); #auto-use classes... - if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ + if ( $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg|part_pkg|Cron)::\w+)::/ || $ljob->job =~ /(FS::\w+)::/ ) { @@ -179,18 +215,46 @@ while (1) { } my $eval = "&". $ljob->job. '(@args);'; + # don't put @args in the log, may expose passwords + $log->info('starting job ('.$ljob->job.')'); warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; - eval $eval; #throw away return value? suppose so + local $FS::UID::AutoCommit = 0; # so that we can clean up failures + do { + # switch user only if a job user is available + local $FS::CurrentUser::CurrentUser = $ljob->access_user || $FS::CurrentUser::CurrentUser; + eval $eval; #throw away return value? suppose so + }; if ( $@ ) { - warn "job $eval failed"; + dbh->rollback; my %hash = $ljob->hash; - $hash{'status'} = 'failed'; $hash{'statustext'} = $@; + if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return? + $hash{'status'} = 'done'; + } else { + $hash{'status'} = 'failed'; + warn "job $eval failed"; + } my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; + dbh->commit; # for the status change only } else { $ljob->delete; + dbh->commit; # for the job itself + } + + if ( $ljob->job eq 'FS::cust_main::queued_bill' ) { + my $queue_stat = new FS::queue_stat { + 'jobnum' => $ljob->jobnum, + 'job' => $ljob->job, + 'custnum' => $ljob->custnum, + 'insert_date' => $ljob->_date, + 'start_date' => $start_date, + 'end_date' => time, + }; + my $error = $queue_stat->insert; + die $error if $error; + dbh->commit; #for the stat } if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { @@ -217,6 +281,15 @@ while (1) { } } +sub untaint_argv { + foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV + #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + # Date::Parse + $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + $ARGV[$_]=$1; + } +} + sub usage { die "Usage:\n\n freeside-queued user\n"; } @@ -237,13 +310,17 @@ freeside-queued - Job queue daemon =head1 SYNOPSIS - freeside-queued user + freeside-queued [ -s | -n ] user =head1 DESCRIPTION Job queue daemon. Should be running at all times. -user: from the mapsecrets file - see config.html from the base documentation +-s: "secure" jobs only (queued billing jobs) + +-n: non-"secure" jobs only (other jobs) + +user: Typically "fs_queue" =head1 VERSION