X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=FS%2Fbin%2Ffreeside-queued;h=36871b295f24d9216e19d52d9d79adc5e7174de8;hp=815def49d94df78ae3784faab30c273a7d4c8025;hb=6de06472ab43534bd889e531ae060bbd4c935518;hpb=54d73dfad0b27edd10ec7c917a96c88d45ad6789 diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 815def49d..36871b295 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -1,23 +1,31 @@ #!/usr/bin/perl -w use strict; -use vars qw( $DEBUG $kids $max_kids %kids ); +use vars qw( $DEBUG $kids $max_kids $sleep_time %kids ); use POSIX qw(:sys_wait_h); use IO::File; +use Getopt::Std; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); -use FS::Record qw(qsearch qsearchs); +use FS::Conf; +use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; +use FS::queue_stat; +use FS::Log; +use FS::Cron::expire_user_pref qw( expire_user_pref ); # no autoloading for non-FS classes... use Net::SSH 0.07; $DEBUG = 0; -$max_kids = '10'; #guess it should be a config file... $kids = 0; +&untaint_argv; #what it sounds like (eww) +use vars qw(%opt); +getopts('sn', \%opt ); + my $user = shift or die &usage; warn "starting daemonization (forking)\n" if $DEBUG; @@ -27,7 +35,6 @@ daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); - $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; @@ -41,6 +48,7 @@ while ( $@ ) { } } +my $log = FS::Log->new('queue'); logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); warn "completing daemonization (detaching))\n" if $DEBUG; @@ -48,6 +56,10 @@ daemonize2(); #-- +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; +$sleep_time = $conf->config('queued-sleep_time') || 10; + my $warnkids=0; while (1) { @@ -56,6 +68,7 @@ while (1) { if ( $kids >= $max_kids ) { warn "WARNING: maximum $kids children reached\n" unless $warnkids++; &reap_kids; + expire_user_pref() unless $warnkids % 10; sleep 1; #waiting for signals is cheap next; } @@ -81,125 +94,181 @@ while (1) { # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; - #assuming mysql 4.1 w/subqueries now - #my $nodepend = driver_name eq 'mysql' - # ? '' - # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. - # ' WHERE queue_depend.jobnum = queue.jobnum ) '; - my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. - ' WHERE queue_depend.jobnum = queue.jobnum ) '; - - my $job = qsearchs( - 'queue', - { 'status' => 'new' }, - '', - driver_name eq 'mysql' - ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE" - : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1" - ) or do { - # if $oldAutoCommit { + my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. + ' WHERE queue_depend.jobnum = queue.jobnum )'; + + #anything with a priority goes after stuff without one + my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + + my $limit = $max_kids - $kids; + + $order_by .= ( driver_name eq 'mysql' + ? " LIMIT $limit FOR UPDATE " + : " FOR UPDATE LIMIT $limit " ); + + my $hashref = { 'status' => 'new' }; + if ( $opt{'s'} ) { + $hashref->{'secure'} = 'Y'; + } elsif ( $opt{'n'} ) { + $hashref->{'secure'} = ''; + } + + #qsearch dies when the db goes away + my @jobs = eval { + qsearch({ + 'table' => 'queue', + 'hashref' => $hashref, + 'extra_sql' => $nodepend, + 'order_by' => $order_by, + }); + }; + if ( $@ ) { + warn "WARNING: error searching for jobs, closing connection: $@"; + undef $FS::UID::dbh; + next; + } + + unless ( @jobs ) { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; - # } - sleep 5; #connecting to db is expensive - next; - }; - - #assuming mysql 4.1 w/subqueries now - #if ( driver_name eq 'mysql' - # && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { - # dbh->commit or die dbh->errstr; #if $oldAutoCommit; - # sleep 5; #would be better if mysql could do everything in query above - # next; - #} - - my %hash = $job->hash; - $hash{'status'} = 'locked'; - my $ljob = new FS::queue ( \%hash ); - my $error = $ljob->replace($job); - if ( $error ) { - warn "WARNING: database error locking job, closing connection: ". - dbh->errstr; - undef $FS::UID::dbh; + expire_user_pref(); + sleep $sleep_time; next; } - # if $oldAutoCommit { - dbh->commit or do { - warn "WARNING: database error, closing connection: ". dbh->errstr; - undef $FS::UID::dbh; - next; - }; - # } + foreach my $job ( @jobs ) { - $FS::UID::AutoCommit = 1; - #} + my $start_date = time; - my @args = $ljob->args; - splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + $log->debug('locking queue job', object => $job); - defined( my $pid = fork ) or do { - warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); - die $error if $error; - next; #don't increment the kid counter - }; + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } - if ( $pid ) { - $kids++; - $kids{$pid} = 1; - } else { #kid time + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; - #get new db handle - $FS::UID::dbh->{InactiveDestroy} = 1; + $FS::UID::AutoCommit = 1; - forksuidsetup($user); + my @args = eval { $ljob->args; }; + if ( $@ ) { + warn "WARNING: error retrieving job arguments, closing connection: $@"; + undef $FS::UID::dbh; + next; + } + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db + next; #don't increment the kid counter + }; - #auto-use classes... - #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { - if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ - || $ljob->job =~ /(FS::\w+)::/ - ) - { - my $class = $1; - eval "use $class;"; + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); + + #auto-use classes... + if ( $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg|part_pkg|Cron)::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + # don't put @args in the log, may expose passwords + $log->info('starting job ('.$ljob->job.')'); + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + local $FS::UID::AutoCommit = 0; # so that we can clean up failures + do { + # switch user only if a job user is available + local $FS::CurrentUser::CurrentUser = $ljob->access_user || $FS::CurrentUser::CurrentUser; + eval $eval; #throw away return value? suppose so + }; if ( $@ ) { - warn "job use $class failed"; + dbh->rollback; my %hash = $ljob->hash; - $hash{'status'} = 'failed'; $hash{'statustext'} = $@; + if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return? + $hash{'status'} = 'done'; + } else { + $hash{'status'} = 'failed'; + warn "job $eval failed"; + } my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; - exit; #end-of-kid - }; + dbh->commit; # for the status change only + } else { + $ljob->delete; + dbh->commit; # for the job itself + } + + if ( $ljob->job eq 'FS::cust_main::queued_bill' ) { + my $queue_stat = new FS::queue_stat { + 'jobnum' => $ljob->jobnum, + 'job' => $ljob->job, + 'custnum' => $ljob->custnum, + 'insert_date' => $ljob->_date, + 'start_date' => $start_date, + 'end_date' => time, + }; + my $error = $queue_stat->insert; + die $error if $error; + dbh->commit; #for the stat + } + + if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { + open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) + or die "can't open profile file: $!"; + print PROFILE dbh->sprintProfile(); + close PROFILE or die "can't close profile file: $!"; + } + + exit; + #end-of-kid } - my $eval = "&". $ljob->job. '(@args);'; - warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; - eval $eval; #throw away return value? suppose so - if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = $@; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); - die $error if $error; - } else { - $ljob->delete; - } - - exit; - #end-of-kid - } + } #foreach my $job } continue { if ( sigterm() ) { @@ -212,6 +281,15 @@ while (1) { } } +sub untaint_argv { + foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV + #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + # Date::Parse + $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + $ARGV[$_]=$1; + } +} + sub usage { die "Usage:\n\n freeside-queued user\n"; } @@ -232,13 +310,17 @@ freeside-queued - Job queue daemon =head1 SYNOPSIS - freeside-queued user + freeside-queued [ -s | -n ] user =head1 DESCRIPTION Job queue daemon. Should be running at all times. -user: from the mapsecrets file - see config.html from the base documentation +-s: "secure" jobs only (queued billing jobs) + +-n: non-"secure" jobs only (other jobs) + +user: Typically "fs_queue" =head1 VERSION