diff options
| -rw-r--r-- | FS/bin/freeside-queued | 191 | 
1 files changed, 91 insertions, 100 deletions
| diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 4093e5aa8..2188dd404 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -6,7 +6,8 @@ use POSIX qw(:sys_wait_h);  use IO::File;  use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);  use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); -use FS::Record qw(qsearch qsearchs); +use FS::Conf; +use FS::Record qw(qsearch);  use FS::queue;  use FS::queue_depend; @@ -15,7 +16,6 @@ use Net::SSH 0.07;  $DEBUG = 0; -$max_kids = '10'; #guess it should be a config file...  $kids = 0;  my $user = shift or die &usage; @@ -27,7 +27,6 @@ daemonize1('freeside-queued');  warn "dropping privledges\n" if $DEBUG;  drop_root(); -  $ENV{HOME} = (getpwuid($>))[7]; #for ssh  warn "connecting to database\n" if $DEBUG; @@ -48,6 +47,9 @@ daemonize2();  #-- +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; +  my $warnkids=0;  while (1) { @@ -81,130 +83,119 @@ while (1) {    #  local $FS::UID::AutoCommit = 0;    $FS::UID::AutoCommit = 0; -  #assuming mysql 4.1 w/subqueries now -  #my $nodepend = driver_name eq 'mysql' -  # ? '' -  # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. -  #   ' WHERE queue_depend.jobnum = queue.jobnum ) '; -  my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. -                 '           WHERE queue_depend.jobnum = queue.jobnum ) '; +  my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. +                 '           WHERE queue_depend.jobnum = queue.jobnum )';    #anything with a priority goes after stuff without one    my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; +  my $limit = $max_kids - $kids; +    $order_by .= ( driver_name eq 'mysql' -                   ? ' LIMIT 1 FOR UPDATE ' -                   : ' FOR UPDATE LIMIT 1 ' ); - -  my $job = qsearchs({ -    'table'      => 'queue', -    'hashref'    => { 'status' => 'new' }, -    'extra_sql'  => $nodepend, -    'order_by'   => $order_by, -  }) or do { -    # if $oldAutoCommit { +                   ? " LIMIT $limit FOR UPDATE " +                   : " FOR UPDATE LIMIT $limit " ); + +  my @jobs = qsearch({ +    'table'     => 'queue', +    'hashref'   => { 'status' => 'new' }, +    'extra_sql' => $nodepend, +    'order_by'  => $order_by, +  }); + +  unless ( @jobs ) {      dbh->commit or do {        warn "WARNING: database error, closing connection: ". dbh->errstr;        undef $FS::UID::dbh;        next;      }; -    # } -    sleep 5; #connecting to db is expensive -    next; -  }; - -  #assuming mysql 4.1 w/subqueries now -  #if ( driver_name eq 'mysql' -  #     && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { -  #  dbh->commit or die dbh->errstr; #if $oldAutoCommit; -  #  sleep 5; #would be better if mysql could do everything in query above -  #  next; -  #} - -  my %hash = $job->hash; -  $hash{'status'} = 'locked'; -  my $ljob = new FS::queue ( \%hash ); -  my $error = $ljob->replace($job); -  if ( $error ) { -    warn "WARNING: database error locking job, closing connection: ". -         dbh->errstr; -    undef $FS::UID::dbh; +    sleep 1;      next;    } -  # if $oldAutoCommit { -  dbh->commit or do { -    warn "WARNING: database error, closing connection: ". dbh->errstr; -    undef $FS::UID::dbh; -    next; -  }; -  # } - -  $FS::UID::AutoCommit = 1; -  #}  +  foreach my $job ( @jobs ) { -  my @args = $ljob->args; -  splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; - -  defined( my $pid = fork ) or do { -    warn "WARNING: can't fork: $!\n";      my %hash = $job->hash; -    $hash{'status'} = 'failed'; -    $hash{'statustext'} = "[freeside-queued] can't fork: $!"; +    $hash{'status'} = 'locked';      my $ljob = new FS::queue ( \%hash );      my $error = $ljob->replace($job); -    die $error if $error; -    next; #don't increment the kid counter -  }; - -  if ( $pid ) { -    $kids++; -    $kids{$pid} = 1; -  } else { #kid time - -    #get new db handle -    $FS::UID::dbh->{InactiveDestroy} = 1; - -    forksuidsetup($user); - -    #auto-use classes... -    #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { -    if (    $ljob->job =~ /(FS::part_export::\w+)::/ -         || $ljob->job =~ /(FS::\w+)::/ -       ) -    { -      my $class = $1; -      eval "use $class;"; +    if ( $error ) { +      warn "WARNING: database error locking job, closing connection: ". +           dbh->errstr; +      undef $FS::UID::dbh; +      next; +    } + +    dbh->commit or do { +      warn "WARNING: database error, closing connection: ". dbh->errstr; +      undef $FS::UID::dbh; +      next; +    }; + +    $FS::UID::AutoCommit = 1; + +    my @args = $ljob->args; +    splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + +    defined( my $pid = fork ) or do { +      warn "WARNING: can't fork: $!\n"; +      my %hash = $job->hash; +      $hash{'status'} = 'failed'; +      $hash{'statustext'} = "[freeside-queued] can't fork: $!"; +      my $ljob = new FS::queue ( \%hash ); +      my $error = $ljob->replace($job); +      die $error if $error; +      next; #don't increment the kid counter +    }; + +    if ( $pid ) { +      $kids++; +      $kids{$pid} = 1; +    } else { #kid time + +      #get new db handle +      $FS::UID::dbh->{InactiveDestroy} = 1; + +      forksuidsetup($user); + +      #auto-use classes... +      if (    $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ +           || $ljob->job =~ /(FS::\w+)::/ +         ) +      { +        my $class = $1; +        eval "use $class;"; +        if ( $@ ) { +          warn "job use $class failed"; +          my %hash = $ljob->hash; +          $hash{'status'} = 'failed'; +          $hash{'statustext'} = $@; +          my $fjob = new FS::queue( \%hash ); +          my $error = $fjob->replace($ljob); +          die $error if $error; +          exit; #end-of-kid +        }; +      } + +      my $eval = "&". $ljob->job. '(@args);'; +      warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; +      eval $eval; #throw away return value?  suppose so        if ( $@ ) { -        warn "job use $class failed"; +        warn "job $eval failed";          my %hash = $ljob->hash;          $hash{'status'} = 'failed';          $hash{'statustext'} = $@;          my $fjob = new FS::queue( \%hash );          my $error = $fjob->replace($ljob);          die $error if $error; -        exit; #end-of-kid -      }; -    } +      } else { +        $ljob->delete; +      } -    my $eval = "&". $ljob->job. '(@args);'; -    warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; -    eval $eval; #throw away return value?  suppose so -    if ( $@ ) { -      warn "job $eval failed"; -      my %hash = $ljob->hash; -      $hash{'status'} = 'failed'; -      $hash{'statustext'} = $@; -      my $fjob = new FS::queue( \%hash ); -      my $error = $fjob->replace($ljob); -      die $error if $error; -    } else { -      $ljob->delete; +      exit; +      #end-of-kid      } -    exit; -    #end-of-kid -  } +  } #foreach my $job  } continue {    if ( sigterm() ) { | 
