X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=FS%2Fbin%2Ffreeside-queued;h=22fd7bb5ee35e7af312b20b92a5a87c960f39abe;hp=9c2867963eb3ab37f3302f29c528ace9fb80a837;hb=70d0d44dff39cb9235cbbec1918c4ea95f0dc4c4;hpb=898496d879275cb04d75d88ec9b78400609b2179 diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 9c2867963..22fd7bb5e 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -6,7 +6,8 @@ use POSIX qw(:sys_wait_h); use IO::File; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); -use FS::Record qw(qsearch qsearchs); +use FS::Conf; +use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; @@ -15,7 +16,6 @@ use Net::SSH 0.07; $DEBUG = 0; -$max_kids = '10'; #guess it should be a config file... $kids = 0; my $user = shift or die &usage; @@ -27,7 +27,6 @@ daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); - $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; @@ -41,13 +40,16 @@ while ( $@ ) { } } -logfile( "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc ); +logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); warn "completing daemonization (detaching))\n" if $DEBUG; daemonize2(); #-- +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; + my $warnkids=0; while (1) { @@ -81,125 +83,128 @@ while (1) { # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; - #assuming mysql 4.1 w/subqueries now - #my $nodepend = driver_name eq 'mysql' - # ? '' - # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. - # ' WHERE queue_depend.jobnum = queue.jobnum ) '; - my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. - ' WHERE queue_depend.jobnum = queue.jobnum ) '; - - my $job = qsearchs( - 'queue', - { 'status' => 'new' }, - '', - driver_name eq 'mysql' - ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE" - : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1" - ) or do { - # if $oldAutoCommit { + my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. + ' WHERE queue_depend.jobnum = queue.jobnum )'; + + #anything with a priority goes after stuff without one + my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + + my $limit = $max_kids - $kids; + + $order_by .= ( driver_name eq 'mysql' + ? " LIMIT $limit FOR UPDATE " + : " FOR UPDATE LIMIT $limit " ); + + my @jobs = qsearch({ + 'table' => 'queue', + 'hashref' => { 'status' => 'new' }, + 'extra_sql' => $nodepend, + 'order_by' => $order_by, + }); + + unless ( @jobs ) { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; - # } - sleep 5; #connecting to db is expensive - next; - }; - - #assuming mysql 4.1 w/subqueries now - #if ( driver_name eq 'mysql' - # && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { - # dbh->commit or die dbh->errstr; #if $oldAutoCommit; - # sleep 5; #would be better if mysql could do everything in query above - # next; - #} - - my %hash = $job->hash; - $hash{'status'} = 'locked'; - my $ljob = new FS::queue ( \%hash ); - my $error = $ljob->replace($job); - if ( $error ) { - warn "WARNING: database error locking job, closing connection: ". - dbh->errstr; - undef $FS::UID::dbh; + sleep 1; next; } - # if $oldAutoCommit { - dbh->commit or do { - warn "WARNING: database error, closing connection: ". dbh->errstr; - undef $FS::UID::dbh; - next; - }; - # } + foreach my $job ( @jobs ) { - $FS::UID::AutoCommit = 1; - #} - - my @args = $ljob->args; - splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; - - defined( my $pid = fork ) or do { - warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); - die $error if $error; - next; #don't increment the kid counter - }; - - if ( $pid ) { - $kids++; - $kids{$pid} = 1; - } else { #kid time - - #get new db handle - $FS::UID::dbh->{InactiveDestroy} = 1; - - forksuidsetup($user); - - #auto-use classes... - #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { - if ( $ljob->job =~ /(FS::part_export::\w+)::/ - || $ljob->job =~ /(FS::\w+)::/ - ) - { - my $class = $1; - eval "use $class;"; + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } + + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + + $FS::UID::AutoCommit = 1; + + my @args = $ljob->args; + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); + + #auto-use classes... + if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + eval $eval; #throw away return value? suppose so if ( $@ ) { - warn "job use $class failed"; + warn "job $eval failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; - exit; #end-of-kid - }; - } - - my $eval = "&". $ljob->job. '(@args);'; - warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; - eval $eval; #throw away return value? suppose so - if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = $@; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); - die $error if $error; - } else { - $ljob->delete; + } else { + $ljob->delete; + } + + if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { + open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) + or die "can't open profile file: $!"; + print PROFILE dbh->sprintProfile(); + close PROFILE or die "can't close profile file: $!"; + } + + exit; + #end-of-kid } - exit; - #end-of-kid - } + } #foreach my $job } continue { if ( sigterm() ) {