use Net::SSH::ssh_cmd for all job queueing rather than local duplicated ssh subs
[freeside.git] / FS / bin / freeside-queued
index 5acffb5..56475d0 100644 (file)
@@ -1,31 +1,38 @@
 #!/usr/bin/perl -w
 
 use strict;
+use vars qw( $log_file $sigterm $sigint );
+use subs qw( _die _logmsg );
 use Fcntl qw(:flock);
 use POSIX qw(setsid);
-use FS::UID qw(adminsuidsetup);
+use Date::Format;
+use IO::File;
+use FS::UID qw(adminsuidsetup forksuidsetup driver_name);
 use FS::Record qw(qsearchs);
 use FS::queue;
 
 # no autoloading just yet
 use FS::cust_main;
+use FS::svc_acct;
+use Net::SSH 0.05;
 
 my $pid_file = '/var/run/freeside-queued.pid';
 
-$SIG{CHLD} = sub { wait }; #zombie prevention
-
-my $sigterm = 0;
-my $sigint = 0;
-$SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
-$SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
-
 my $user = shift or die &usage;
 
 &daemonize;
 
-my $log_file = "/usr/local/etc/freeside/queuelog.";
+sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; }
+$SIG{CHLD} =  \&REAPER;
+
+ $sigterm = 0;
+ $sigint = 0;
+$SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
+$SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
 
 $> = $FS::UID::freeside_uid unless $>;
+$< = $>;
+$ENV{HOME} = (getpwuid($>))[7]; #for ssh
 adminsuidsetup $user;
 
 $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
@@ -33,6 +40,7 @@ $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
 $SIG{__DIE__} = \&_die;
 $SIG{__WARN__} = \&_logmsg;
 
+warn "freeside-queued starting\n";
 
 while (1) {
 
@@ -40,7 +48,9 @@ while (1) {
     'queue',
     { 'status' => 'new' },
     '',
-    'ORDER BY jobnum FOR UPDATE LIMIT 1'
+    driver_name =~ /^mysql$/i
+      ? 'ORDER BY jobnum LIMIT 1 FOR UPDATE'
+      : 'ORDER BY jobnum FOR UPDATE LIMIT 1'
   ) or do {
     sleep 5;
     next;
@@ -54,21 +64,42 @@ while (1) {
 
   my @args = $ljob->args;
 
-  #fork a child for each job (up to some maximum perhaps?)
-  #single-threaded for now.
-
-  my $eval = "&". $ljob->job. '(@args);';
-  warn "running $eval";
-  eval $eval;
-  if ( $@ ) {
-    warn "job $eval failed";
-    my $hash = $ljob->hash;
+  # number of children limit?
+  defined( my $pid = fork ) or do {
+    warn "WARNING: can't fork: $!\n";
+    my %hash = $job->hash;
     $hash{'status'} = 'failed';
-    my $fjob = new FS::queue( \%hash );
-    my $error = $fjob->replace($ljob);
+    $hash{'statustext'} = "[freeside-queued] can't fork: $!";
+    my $ljob = new FS::queue ( \%hash );
+    my $error = $ljob->replace($job);
     die $error if $error;
-  } else {
-    $ljob->delete;
+  };
+
+  unless ( $pid ) { #kid time
+
+    #get new db handles
+    $FS::UID::dbh->{InactiveDestroy} = 1;
+    $FS::svc_acct::icradius_dbh->{InactiveDestroy} = 1
+      if $FS::svc_acct::icradius_dbh;
+    forksuidsetup($user);
+
+    my $eval = "&". $ljob->job. '(@args);';
+    warn "running $eval";
+    eval $eval;
+    if ( $@ ) {
+      warn "job $eval failed";
+      my %hash = $ljob->hash;
+      $hash{'status'} = 'failed';
+      $hash{'statustext'} = $@;
+      my $fjob = new FS::queue( \%hash );
+      my $error = $fjob->replace($ljob);
+      die $error if $error;
+    } else {
+      $ljob->delete;
+    }
+
+    exit;
+    #end-of-kid
   }
 
 } continue {
@@ -82,9 +113,8 @@ while (1) {
   }
 }
 
-
-sub datestamp {
-  time2str("%m%d%Y", time);
+sub usage {
+  die "Usage:\n\n  freeside-queued user\n";
 }
 
 sub _die {
@@ -100,6 +130,7 @@ sub _logmsg {
   seek($log, 0, 2);
   print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n";
   flock($log, LOCK_UN);
+  close $log;
 }
 
 sub daemonize {