fix DBI connection, RT#39250
[freeside.git] / FS / bin / freeside-queued
index d16513b..36871b2 100644 (file)
@@ -1,23 +1,31 @@
 #!/usr/bin/perl -w
 
 use strict;
-use vars qw( $DEBUG $kids $max_kids %kids );
+use vars qw( $DEBUG $kids $max_kids $sleep_time %kids );
 use POSIX qw(:sys_wait_h);
 use IO::File;
+use Getopt::Std;
 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
 use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
-use FS::Record qw(qsearch qsearchs);
+use FS::Conf;
+use FS::Record qw(qsearch);
 use FS::queue;
 use FS::queue_depend;
+use FS::queue_stat;
+use FS::Log;
+use FS::Cron::expire_user_pref qw( expire_user_pref );
 
 # no autoloading for non-FS classes...
 use Net::SSH 0.07;
 
 $DEBUG = 0;
 
-$max_kids = '10'; #guess it should be a config file...
 $kids = 0;
 
+&untaint_argv;  #what it sounds like  (eww)
+use vars qw(%opt);
+getopts('sn', \%opt );
+
 my $user = shift or die &usage;
 
 warn "starting daemonization (forking)\n" if $DEBUG;
@@ -27,7 +35,6 @@ daemonize1('freeside-queued');
 warn "dropping privledges\n" if $DEBUG;
 drop_root();
 
-
 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
 
 warn "connecting to database\n" if $DEBUG;
@@ -41,13 +48,18 @@ while ( $@ ) {
   }
 }
 
-logfile( "%%%FREESIDE_CONF%%%/queuelog.". $FS::UID::datasrc );
+my $log = FS::Log->new('queue');
+logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc );
 
 warn "completing daemonization (detaching))\n" if $DEBUG;
 daemonize2();
 
 #--
 
+my $conf = new FS::Conf;
+$max_kids = $conf->config('queued-max_kids') || 10;
+$sleep_time = $conf->config('queued-sleep_time') || 10;
+
 my $warnkids=0;
 while (1) {
 
@@ -56,6 +68,7 @@ while (1) {
   if ( $kids >= $max_kids ) {
     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
     &reap_kids;
+    expire_user_pref() unless $warnkids % 10;
     sleep 1; #waiting for signals is cheap
     next;
   }
@@ -81,125 +94,181 @@ while (1) {
   #  local $FS::UID::AutoCommit = 0;
   $FS::UID::AutoCommit = 0;
 
-  #assuming mysql 4.1 w/subqueries now
-  #my $nodepend = driver_name eq 'mysql'
-  # ? ''
-  # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
-  #   ' WHERE queue_depend.jobnum = queue.jobnum ) ';
-  my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
-                 '           WHERE queue_depend.jobnum = queue.jobnum ) ';
-
-  my $job = qsearchs(
-    'queue',
-    { 'status' => 'new' },
-    '',
-    driver_name eq 'mysql'
-      ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
-      : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
-  ) or do {
-    # if $oldAutoCommit {
+  my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'.
+                 '           WHERE queue_depend.jobnum = queue.jobnum )';
+
+  #anything with a priority goes after stuff without one
+  my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC ';
+
+  my $limit = $max_kids - $kids;
+
+  $order_by .= ( driver_name eq 'mysql'
+                   ? " LIMIT $limit FOR UPDATE "
+                   : " FOR UPDATE LIMIT $limit " );
+
+  my $hashref = { 'status' => 'new' };
+  if ( $opt{'s'} ) {
+    $hashref->{'secure'} = 'Y';
+  } elsif ( $opt{'n'} ) {
+    $hashref->{'secure'} = '';
+  }
+
+  #qsearch dies when the db goes away
+  my @jobs = eval {
+    qsearch({
+      'table'     => 'queue',
+      'hashref'   => $hashref,
+      'extra_sql' => $nodepend,
+      'order_by'  => $order_by,
+    });
+  };
+  if ( $@ ) {
+    warn "WARNING: error searching for jobs, closing connection: $@";
+    undef $FS::UID::dbh;
+    next;
+  }
+
+  unless ( @jobs ) {
     dbh->commit or do {
       warn "WARNING: database error, closing connection: ". dbh->errstr;
       undef $FS::UID::dbh;
       next;
     };
-    # }
-    sleep 5; #connecting to db is expensive
-    next;
-  };
-
-  #assuming mysql 4.1 w/subqueries now
-  #if ( driver_name eq 'mysql'
-  #     && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
-  #  dbh->commit or die dbh->errstr; #if $oldAutoCommit;
-  #  sleep 5; #would be better if mysql could do everything in query above
-  #  next;
-  #}
-
-  my %hash = $job->hash;
-  $hash{'status'} = 'locked';
-  my $ljob = new FS::queue ( \%hash );
-  my $error = $ljob->replace($job);
-  if ( $error ) {
-    warn "WARNING: database error locking job, closing connection: ".
-         dbh->errstr;
-    undef $FS::UID::dbh;
+    expire_user_pref();
+    sleep $sleep_time;
     next;
   }
 
-  # if $oldAutoCommit {
-  dbh->commit or do {
-    warn "WARNING: database error, closing connection: ". dbh->errstr;
-    undef $FS::UID::dbh;
-    next;
-  };
-  # }
+  foreach my $job ( @jobs ) {
 
-  $FS::UID::AutoCommit = 1;
-  #} 
+    my $start_date = time;
 
-  my @args = $ljob->args;
-  splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
+    $log->debug('locking queue job', object => $job);
 
-  defined( my $pid = fork ) or do {
-    warn "WARNING: can't fork: $!\n";
     my %hash = $job->hash;
-    $hash{'status'} = 'failed';
-    $hash{'statustext'} = "[freeside-queued] can't fork: $!";
+    $hash{'status'} = 'locked';
     my $ljob = new FS::queue ( \%hash );
     my $error = $ljob->replace($job);
-    die $error if $error;
-    next; #don't increment the kid counter
-  };
+    if ( $error ) {
+      warn "WARNING: database error locking job, closing connection: ".
+           dbh->errstr;
+      undef $FS::UID::dbh;
+      next;
+    }
 
-  if ( $pid ) {
-    $kids++;
-    $kids{$pid} = 1;
-  } else { #kid time
+    dbh->commit or do {
+      warn "WARNING: database error, closing connection: ". dbh->errstr;
+      undef $FS::UID::dbh;
+      next;
+    };
 
-    #get new db handle
-    $FS::UID::dbh->{InactiveDestroy} = 1;
+    $FS::UID::AutoCommit = 1;
 
-    forksuidsetup($user);
+    my @args = eval { $ljob->args; };
+    if ( $@ ) {
+      warn "WARNING: error retrieving job arguments, closing connection: $@";
+      undef $FS::UID::dbh;
+      next;
+    }
+    splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
+
+    defined( my $pid = fork ) or do {
+      warn "WARNING: can't fork: $!\n";
+      my %hash = $job->hash;
+      $hash{'status'} = 'failed';
+      $hash{'statustext'} = "[freeside-queued] can't fork: $!";
+      my $ljob = new FS::queue ( \%hash );
+      my $error = $ljob->replace($job);
+      die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db
+      next; #don't increment the kid counter
+    };
 
-    #auto-use classes...
-    #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
-    if (    $ljob->job =~ /(FS::part_export::\w+)::/
-         || $ljob->job =~ /(FS::\w+)::/
-       )
-    {
-      my $class = $1;
-      eval "use $class;";
+    if ( $pid ) {
+      $kids++;
+      $kids{$pid} = 1;
+    } else { #kid time
+
+      #get new db handle
+      $FS::UID::dbh->{InactiveDestroy} = 1;
+
+      forksuidsetup($user);
+
+      dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile');
+
+      #auto-use classes...
+      if (    $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg|part_pkg|Cron)::\w+)::/
+           || $ljob->job =~ /(FS::\w+)::/
+         )
+      {
+        my $class = $1;
+        eval "use $class;";
+        if ( $@ ) {
+          warn "job use $class failed";
+          my %hash = $ljob->hash;
+          $hash{'status'} = 'failed';
+          $hash{'statustext'} = $@;
+          my $fjob = new FS::queue( \%hash );
+          my $error = $fjob->replace($ljob);
+          die $error if $error;
+          exit; #end-of-kid
+        };
+      }
+
+      my $eval = "&". $ljob->job. '(@args);';
+      # don't put @args in the log, may expose passwords
+      $log->info('starting job ('.$ljob->job.')');
+      warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
+      local $FS::UID::AutoCommit = 0; # so that we can clean up failures
+      do {
+        # switch user only if a job user is available
+        local $FS::CurrentUser::CurrentUser = $ljob->access_user || $FS::CurrentUser::CurrentUser;
+        eval $eval; #throw away return value?  suppose so
+      };
       if ( $@ ) {
-        warn "job use $class failed";
+        dbh->rollback;
         my %hash = $ljob->hash;
-        $hash{'status'} = 'failed';
         $hash{'statustext'} = $@;
+        if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return?
+          $hash{'status'} = 'done'; 
+        } else {
+          $hash{'status'} = 'failed';
+          warn "job $eval failed";
+        }
         my $fjob = new FS::queue( \%hash );
         my $error = $fjob->replace($ljob);
         die $error if $error;
-        exit; #end-of-kid
-      };
+        dbh->commit; # for the status change only
+      } else {
+        $ljob->delete;
+        dbh->commit; # for the job itself
+      }
+
+      if ( $ljob->job eq 'FS::cust_main::queued_bill' ) {
+        my $queue_stat = new FS::queue_stat {
+          'jobnum'      => $ljob->jobnum,
+          'job'         => $ljob->job,
+          'custnum'     => $ljob->custnum,
+          'insert_date' => $ljob->_date,
+          'start_date'  => $start_date,
+          'end_date'    => time,
+        };
+        my $error = $queue_stat->insert;
+        die $error if $error;
+        dbh->commit; #for the stat
+      }
+
+      if ( UNIVERSAL::can(dbh, 'sprintProfile') ) {
+        open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time)
+          or die "can't open profile file: $!";
+        print PROFILE dbh->sprintProfile();
+        close PROFILE or die "can't close profile file: $!";
+      }
+
+      exit;
+      #end-of-kid
     }
 
-    my $eval = "&". $ljob->job. '(@args);';
-    warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
-    eval $eval; #throw away return value?  suppose so
-    if ( $@ ) {
-      warn "job $eval failed";
-      my %hash = $ljob->hash;
-      $hash{'status'} = 'failed';
-      $hash{'statustext'} = $@;
-      my $fjob = new FS::queue( \%hash );
-      my $error = $fjob->replace($ljob);
-      die $error if $error;
-    } else {
-      $ljob->delete;
-    }
-
-    exit;
-    #end-of-kid
-  }
+  } #foreach my $job
 
 } continue {
   if ( sigterm() ) {
@@ -212,6 +281,15 @@ while (1) {
   }
 }
 
+sub untaint_argv {
+  foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV
+    #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\"";
+    # Date::Parse
+    $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\"";
+    $ARGV[$_]=$1;
+  }
+}
+
 sub usage {
   die "Usage:\n\n  freeside-queued user\n";
 }
@@ -232,13 +310,17 @@ freeside-queued - Job queue daemon
 
 =head1 SYNOPSIS
 
-  freeside-queued user
+  freeside-queued [ -s | -n ] user
 
 =head1 DESCRIPTION
 
 Job queue daemon.  Should be running at all times.
 
-user: from the mapsecrets file - see config.html from the base documentation
+-s: "secure" jobs only (queued billing jobs)
+
+-n: non-"secure" jobs only (other jobs)
+
+user: Typically "fs_queue"
 
 =head1 VERSION