#!/usr/bin/perl -w
use strict;
-use Fcntl qw(:flock);
-use POSIX qw(setsid);
-use FS::UID qw(adminsuidsetup);
-use FS::Record qw(qsearchs);
+use vars qw( $DEBUG $kids $max_kids %kids );
+use POSIX qw(:sys_wait_h);
+use IO::File;
+use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
+use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
+use FS::Record qw(qsearch qsearchs);
use FS::queue;
+use FS::queue_depend;
-# no autoloading just yet
-use FS::cust_main;
+# no autoloading for non-FS classes...
+use Net::SSH 0.07;
-my $pid_file = '/var/run/freeside-queued.pid';
+$DEBUG = 0;
-$SIG{CHLD} = sub { wait }; #zombie prevention
-
-my $sigterm = 0;
-my $sigint = 0;
-$SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
-$SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
+$max_kids = '10'; #guess it should be a config file...
+$kids = 0;
my $user = shift or die &usage;
-&daemonize;
+warn "starting daemonization (forking)\n" if $DEBUG;
+#daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs
+daemonize1('freeside-queued');
+
+warn "dropping privledges\n" if $DEBUG;
+drop_root();
+
-my $log_file = "/usr/local/etc/freeside/queuelog.";
+$ENV{HOME} = (getpwuid($>))[7]; #for ssh
-$> = $FS::UID::freeside_uid unless $>;
-adminsuidsetup $user;
+warn "connecting to database\n" if $DEBUG;
+$@ = 'not connected';
+while ( $@ ) {
+ eval { adminsuidsetup $user; };
+ if ( $@ ) {
+ warn $@;
+ warn "sleeping for reconnect...\n";
+ sleep 5;
+ }
+}
-$log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
+logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc );
-$SIG{__DIE__} = \&_die;
-$SIG{__WARN__} = \&_logmsg;
+warn "completing daemonization (detaching))\n" if $DEBUG;
+daemonize2();
+#--
+my $warnkids=0;
while (1) {
+ &reap_kids;
+ #prevent runaway forking
+ if ( $kids >= $max_kids ) {
+ warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
+ &reap_kids;
+ sleep 1; #waiting for signals is cheap
+ next;
+ }
+ $warnkids=0;
+
+ unless ( dbh && dbh->ping ) {
+ warn "WARNING: connection to database lost, reconnecting...\n";
+
+ eval { $FS::UID::dbh = myconnect; };
+
+ unless ( !$@ && dbh && dbh->ping ) {
+ warn "WARNING: still no connection to database, sleeping for retry...\n";
+ sleep 10;
+ next;
+ } else {
+ warn "WARNING: reconnected to database\n";
+ }
+ }
+
+ #my($job, $ljob);
+ #{
+ # my $oldAutoCommit = $FS::UID::AutoCommit;
+ # local $FS::UID::AutoCommit = 0;
+ $FS::UID::AutoCommit = 0;
+
+ #assuming mysql 4.1 w/subqueries now
+ #my $nodepend = driver_name eq 'mysql'
+ # ? ''
+ # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
+ # ' WHERE queue_depend.jobnum = queue.jobnum ) ';
+ my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
+ ' WHERE queue_depend.jobnum = queue.jobnum ) ';
+
my $job = qsearchs(
'queue',
{ 'status' => 'new' },
'',
- 'ORDER BY jobnum FOR UPDATE LIMIT 1'
+ driver_name eq 'mysql'
+ ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
+ : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
) or do {
- sleep 5;
+ # if $oldAutoCommit {
+ dbh->commit or do {
+ warn "WARNING: database error, closing connection: ". dbh->errstr;
+ undef $FS::UID::dbh;
+ next;
+ };
+ # }
+ sleep 5; #connecting to db is expensive
next;
};
+ #assuming mysql 4.1 w/subqueries now
+ #if ( driver_name eq 'mysql'
+ # && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
+ # dbh->commit or die dbh->errstr; #if $oldAutoCommit;
+ # sleep 5; #would be better if mysql could do everything in query above
+ # next;
+ #}
+
my %hash = $job->hash;
$hash{'status'} = 'locked';
my $ljob = new FS::queue ( \%hash );
my $error = $ljob->replace($job);
- die $error if $error;
+ if ( $error ) {
+ warn "WARNING: database error locking job, closing connection: ".
+ dbh->errstr;
+ undef $FS::UID::dbh;
+ next;
+ }
- my @args = $ljob->args;
+ # if $oldAutoCommit {
+ dbh->commit or do {
+ warn "WARNING: database error, closing connection: ". dbh->errstr;
+ undef $FS::UID::dbh;
+ next;
+ };
+ # }
- #fork a child for each job (up to some maximum perhaps?)
- #single-threaded for now.
+ $FS::UID::AutoCommit = 1;
+ #}
- my $eval = "&". $ljob->job. '(@args);';
- warn "running $eval";
- eval $eval;
- if ( $@ ) {
- warn "job $eval failed";
- my $hash = $ljob->hash;
+ my @args = $ljob->args;
+ splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
+
+ defined( my $pid = fork ) or do {
+ warn "WARNING: can't fork: $!\n";
+ my %hash = $job->hash;
$hash{'status'} = 'failed';
- my $fjob = new FS::queue( \%hash );
- my $error = $fjob->replace($ljob);
+ $hash{'statustext'} = "[freeside-queued] can't fork: $!";
+ my $ljob = new FS::queue ( \%hash );
+ my $error = $ljob->replace($job);
die $error if $error;
- } else {
- $ljob->delete;
+ next; #don't increment the kid counter
+ };
+
+ if ( $pid ) {
+ $kids++;
+ $kids{$pid} = 1;
+ } else { #kid time
+
+ #get new db handle
+ $FS::UID::dbh->{InactiveDestroy} = 1;
+
+ forksuidsetup($user);
+
+ #auto-use classes...
+ #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
+ if ( $ljob->job =~ /(FS::part_export::\w+)::/
+ || $ljob->job =~ /(FS::\w+)::/
+ )
+ {
+ my $class = $1;
+ eval "use $class;";
+ if ( $@ ) {
+ warn "job use $class failed";
+ my %hash = $ljob->hash;
+ $hash{'status'} = 'failed';
+ $hash{'statustext'} = $@;
+ my $fjob = new FS::queue( \%hash );
+ my $error = $fjob->replace($ljob);
+ die $error if $error;
+ exit; #end-of-kid
+ };
+ }
+
+ my $eval = "&". $ljob->job. '(@args);';
+ warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
+ eval $eval; #throw away return value? suppose so
+ if ( $@ ) {
+ warn "job $eval failed";
+ my %hash = $ljob->hash;
+ $hash{'status'} = 'failed';
+ $hash{'statustext'} = $@;
+ my $fjob = new FS::queue( \%hash );
+ my $error = $fjob->replace($ljob);
+ die $error if $error;
+ } else {
+ $ljob->delete;
+ }
+
+ exit;
+ #end-of-kid
}
} continue {
- if ( $sigterm ) {
+ if ( sigterm() ) {
warn "received TERM signal; exiting\n";
exit;
}
- if ( $sigint ) {
+ if ( sigint() ) {
warn "received INT signal; exiting\n";
exit;
}
}
-
-sub datestamp {
- time2str("%m%d%Y", time);
-}
-
-sub _die {
- my $msg = shift;
- unlink $pid_file if -e $pid_file;
- _logmsg($msg);
-}
-
-sub _logmsg {
- chomp( my $msg = shift );
- my $log = new IO::File ">>$log_file";
- flock($log, LOCK_EX);
- seek($log, 0, 2);
- print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n";
- flock($log, LOCK_UN);
+sub usage {
+ die "Usage:\n\n freeside-queued user\n";
}
-sub daemonize {
-
- chdir "/" or die "Can't chdir to /: $!";
- open STDIN, '/dev/null' or die "Can't read /dev/null: $!";
- defined(my $pid = fork) or die "Can't fork: $!";
- if ( $pid ) {
- print "freeside-queued started with pid $pid\n"; #logging to $log_file\n";
- exit unless $pid_file;
- my $pidfh = new IO::File ">$pid_file" or exit;
- print $pidfh "$pid\n";
- exit;
+sub reap_kids {
+ foreach my $pid ( keys %kids ) {
+ my $kid = waitpid($pid, WNOHANG);
+ if ( $kid > 0 ) {
+ $kids--;
+ delete $kids{$kid};
+ }
}
- open STDOUT, '>/dev/null'
- or die "Can't write to /dev/null: $!";
- setsid or die "Can't start a new session: $!";
- open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
-
}
=head1 NAME