diff options
Diffstat (limited to 'FS/bin/freeside-queued')
| -rw-r--r-- | FS/bin/freeside-queued | 293 | 
1 files changed, 293 insertions, 0 deletions
| diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued new file mode 100644 index 000000000..d5d84cced --- /dev/null +++ b/FS/bin/freeside-queued @@ -0,0 +1,293 @@ +#!/usr/bin/perl -w + +use strict; +use vars qw( $DEBUG $kids $max_kids %kids ); +use POSIX qw(:sys_wait_h); +use IO::File; +use Getopt::Std; +use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); +use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); +use FS::Conf; +use FS::Record qw(qsearch); +use FS::queue; +use FS::queue_depend; + +# no autoloading for non-FS classes... +use Net::SSH 0.07; + +$DEBUG = 0; + +$kids = 0; + +&untaint_argv;  #what it sounds like  (eww) +use vars qw(%opt); +getopts('sn', \%opt ); + +my $user = shift or die &usage; + +warn "starting daemonization (forking)\n" if $DEBUG; +#daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs +daemonize1('freeside-queued'); + +warn "dropping privledges\n" if $DEBUG; +drop_root(); + +$ENV{HOME} = (getpwuid($>))[7]; #for ssh + +warn "connecting to database\n" if $DEBUG; +$@ = 'not connected'; +while ( $@ ) { +  eval { adminsuidsetup $user; }; +  if ( $@ ) { +    warn $@; +    warn "sleeping for reconnect...\n"; +    sleep 5; +  } +} + +logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc ); + +warn "completing daemonization (detaching))\n" if $DEBUG; +daemonize2(); + +#-- + +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; + +my $warnkids=0; +while (1) { + +  &reap_kids; +  #prevent runaway forking +  if ( $kids >= $max_kids ) { +    warn "WARNING: maximum $kids children reached\n" unless $warnkids++; +    &reap_kids; +    sleep 1; #waiting for signals is cheap +    next; +  } +  $warnkids=0; + +  unless ( dbh && dbh->ping ) { +    warn "WARNING: connection to database lost, reconnecting...\n"; + +    eval { $FS::UID::dbh = myconnect; }; + +    unless ( !$@ && dbh && dbh->ping ) { +      warn "WARNING: still no connection to database, sleeping for retry...\n"; +      sleep 10; +      next; +    } else { +      warn "WARNING: reconnected to database\n"; +    } +  } + +  #my($job, $ljob); +  #{ +  #  my $oldAutoCommit = $FS::UID::AutoCommit; +  #  local $FS::UID::AutoCommit = 0; +  $FS::UID::AutoCommit = 0; + +  my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. +                 '           WHERE queue_depend.jobnum = queue.jobnum )'; + +  #anything with a priority goes after stuff without one +  my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + +  my $limit = $max_kids - $kids; + +  $order_by .= ( driver_name eq 'mysql' +                   ? " LIMIT $limit FOR UPDATE " +                   : " FOR UPDATE LIMIT $limit " ); + +  my $hashref = { 'status' => 'new' }; +  if ( $opt{'s'} ) { +    $hashref->{'secure'} = 'Y'; +  } elsif ( $opt{'n'} ) { +    $hashref->{'secure'} = ''; +  } + +  #qsearch dies when the db goes away +  my @jobs = eval { +    qsearch({ +      'table'     => 'queue', +      'hashref'   => $hashref, +      'extra_sql' => $nodepend, +      'order_by'  => $order_by, +    }); +  }; +  if ( $@ ) { +    warn "WARNING: error searching for jobs, closing connection: $@"; +    undef $FS::UID::dbh; +    next; +  } + +  unless ( @jobs ) { +    dbh->commit or do { +      warn "WARNING: database error, closing connection: ". dbh->errstr; +      undef $FS::UID::dbh; +      next; +    }; +    sleep 1; +    next; +  } + +  foreach my $job ( @jobs ) { + +    my %hash = $job->hash; +    $hash{'status'} = 'locked'; +    my $ljob = new FS::queue ( \%hash ); +    my $error = $ljob->replace($job); +    if ( $error ) { +      warn "WARNING: database error locking job, closing connection: ". +           dbh->errstr; +      undef $FS::UID::dbh; +      next; +    } + +    dbh->commit or do { +      warn "WARNING: database error, closing connection: ". dbh->errstr; +      undef $FS::UID::dbh; +      next; +    }; + +    $FS::UID::AutoCommit = 1; + +    my @args = eval { $ljob->args; }; +    if ( $@ ) { +      warn "WARNING: error retrieving job arguments, closing connection: $@"; +      undef $FS::UID::dbh; +      next; +    } +    splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + +    defined( my $pid = fork ) or do { +      warn "WARNING: can't fork: $!\n"; +      my %hash = $job->hash; +      $hash{'status'} = 'failed'; +      $hash{'statustext'} = "[freeside-queued] can't fork: $!"; +      my $ljob = new FS::queue ( \%hash ); +      my $error = $ljob->replace($job); +      die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db +      next; #don't increment the kid counter +    }; + +    if ( $pid ) { +      $kids++; +      $kids{$pid} = 1; +    } else { #kid time + +      #get new db handle +      $FS::UID::dbh->{InactiveDestroy} = 1; + +      forksuidsetup($user); + +      dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); + +      #auto-use classes... +      if (    $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ +           || $ljob->job =~ /(FS::\w+)::/ +         ) +      { +        my $class = $1; +        eval "use $class;"; +        if ( $@ ) { +          warn "job use $class failed"; +          my %hash = $ljob->hash; +          $hash{'status'} = 'failed'; +          $hash{'statustext'} = $@; +          my $fjob = new FS::queue( \%hash ); +          my $error = $fjob->replace($ljob); +          die $error if $error; +          exit; #end-of-kid +        }; +      } + +      my $eval = "&". $ljob->job. '(@args);'; +      warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; +      eval $eval; #throw away return value?  suppose so +      if ( $@ ) { +        warn "job $eval failed"; +        my %hash = $ljob->hash; +        $hash{'status'} = 'failed'; +        $hash{'statustext'} = $@; +        my $fjob = new FS::queue( \%hash ); +        my $error = $fjob->replace($ljob); +        die $error if $error; +      } else { +        $ljob->delete; +      } + +      if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { +        open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) +          or die "can't open profile file: $!"; +        print PROFILE dbh->sprintProfile(); +        close PROFILE or die "can't close profile file: $!"; +      } + +      exit; +      #end-of-kid +    } + +  } #foreach my $job + +} continue { +  if ( sigterm() ) { +    warn "received TERM signal; exiting\n"; +    exit; +  } +  if ( sigint() ) { +    warn "received INT signal; exiting\n"; +    exit; +  } +} + +sub untaint_argv { +  foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV +    #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; +    # Date::Parse +    $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; +    $ARGV[$_]=$1; +  } +} + +sub usage { +  die "Usage:\n\n  freeside-queued user\n"; +} + +sub reap_kids { +  foreach my $pid ( keys %kids ) { +    my $kid = waitpid($pid, WNOHANG); +    if ( $kid > 0 ) { +      $kids--; +      delete $kids{$kid}; +    } +  } +} + +=head1 NAME + +freeside-queued - Job queue daemon + +=head1 SYNOPSIS + +  freeside-queued [ -s | -n ] user + +=head1 DESCRIPTION + +Job queue daemon.  Should be running at all times. + +-s: "secure" jobs only (queued billing jobs) + +-n: non-"secure" jobs only (other jobs) + +user: from the mapsecrets file - see config.html from the base documentation + +=head1 VERSION + +=head1 BUGS + +=head1 SEE ALSO + +=cut + | 
