From a80720cc443697b1be73e0570fa40d592ccbe8bd Mon Sep 17 00:00:00 2001 From: ivan Date: Sun, 26 Apr 2009 23:19:13 +0000 Subject: [PATCH] start small jobs more efficiently, RT#4412 --- FS/bin/freeside-queued | 164 +++++++++++++++++++++++++------------------------ 1 file changed, 84 insertions(+), 80 deletions(-) diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 8c07638b3..2188dd404 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -6,7 +6,8 @@ use POSIX qw(:sys_wait_h); use IO::File; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); -use FS::Record qw(qsearch qsearchs); +use FS::Conf; +use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; @@ -15,7 +16,6 @@ use Net::SSH 0.07; $DEBUG = 0; -$max_kids = '10'; #guess it should be a config file... $kids = 0; my $user = shift or die &usage; @@ -27,7 +27,6 @@ daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); - $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; @@ -48,6 +47,9 @@ daemonize2(); #-- +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; + my $warnkids=0; while (1) { @@ -81,117 +83,119 @@ while (1) { # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; - my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. + my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. ' WHERE queue_depend.jobnum = queue.jobnum )'; #anything with a priority goes after stuff without one my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + my $limit = $max_kids - $kids; + $order_by .= ( driver_name eq 'mysql' - ? ' LIMIT 1 FOR UPDATE ' - : ' FOR UPDATE LIMIT 1 ' ); + ? " LIMIT $limit FOR UPDATE " + : " FOR UPDATE LIMIT $limit " ); - my $job = qsearchs({ + my @jobs = qsearch({ 'table' => 'queue', 'hashref' => { 'status' => 'new' }, 'extra_sql' => $nodepend, 'order_by' => $order_by, - }) or do { - # if $oldAutoCommit { + }); + + unless ( @jobs ) { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; - # } sleep 1; next; - }; - - my %hash = $job->hash; - $hash{'status'} = 'locked'; - my $ljob = new FS::queue ( \%hash ); - my $error = $ljob->replace($job); - if ( $error ) { - warn "WARNING: database error locking job, closing connection: ". - dbh->errstr; - undef $FS::UID::dbh; - next; } - # if $oldAutoCommit { - dbh->commit or do { - warn "WARNING: database error, closing connection: ". dbh->errstr; - undef $FS::UID::dbh; - next; - }; - # } - - $FS::UID::AutoCommit = 1; - #} - - my @args = $ljob->args; - splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + foreach my $job ( @jobs ) { - defined( my $pid = fork ) or do { - warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); - die $error if $error; - next; #don't increment the kid counter - }; - - if ( $pid ) { - $kids++; - $kids{$pid} = 1; - } else { #kid time - - #get new db handle - $FS::UID::dbh->{InactiveDestroy} = 1; - - forksuidsetup($user); - - #auto-use classes... - #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { - if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ - || $ljob->job =~ /(FS::\w+)::/ - ) - { - my $class = $1; - eval "use $class;"; + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } + + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + + $FS::UID::AutoCommit = 1; + + my @args = $ljob->args; + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + #auto-use classes... + if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + eval $eval; #throw away return value? suppose so if ( $@ ) { - warn "job use $class failed"; + warn "job $eval failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; - exit; #end-of-kid - }; - } + } else { + $ljob->delete; + } - my $eval = "&". $ljob->job. '(@args);'; - warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; - eval $eval; #throw away return value? suppose so - if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = $@; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); - die $error if $error; - } else { - $ljob->delete; + exit; + #end-of-kid } - exit; - #end-of-kid - } + } #foreach my $job } continue { if ( sigterm() ) { -- 2.11.0