X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=FS%2Fbin%2Ffreeside-queued;h=42d00cebef7bc73f85149003561530ccc0036c40;hp=d8af19866a42303fa6c18d1a1f6c0514ce3873d4;hb=fe610572852e2eb7c3458e77dc167c25a098c84a;hpb=0ee3925f49c81dbabdd8042eba96fa55f8d3141e diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index d8af19866..42d00cebe 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -1,79 +1,164 @@ #!/usr/bin/perl -w use strict; -use vars qw( $log_file $sigterm $sigint ); +use vars qw( $log_file $sigterm $sigint $kids $max_kids ); use subs qw( _die _logmsg ); use Fcntl qw(:flock); use POSIX qw(setsid); use Date::Format; use IO::File; -use FS::UID qw(adminsuidsetup); -use FS::Record qw(qsearchs); +use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh); +use FS::Record qw(qsearch qsearchs); use FS::queue; +use FS::queue_depend; # no autoloading just yet use FS::cust_main; use FS::svc_acct; -use Net::SSH; +use Net::SSH 0.06; +use FS::part_export; -my $pid_file = '/var/run/freeside-queued.pid'; - -$SIG{CHLD} = sub { wait }; #zombie prevention +$max_kids = '10'; #guess it should be a config file... +$kids = 0; my $user = shift or die &usage; -&daemonize; +#my $pid_file = "/var/run/freeside-queued.$user.pid"; +my $pid_file = "/var/run/freeside-queued.pid"; + +&daemonize1; + +sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; $kids--; } +$SIG{CHLD} = \&REAPER; - $sigterm = 0; - $sigint = 0; +$sigterm = 0; +$sigint = 0; $SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; }; $SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; }; $> = $FS::UID::freeside_uid unless $>; +$< = $>; +$ENV{HOME} = (getpwuid($>))[7]; #for ssh adminsuidsetup $user; $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc; +&daemonize2; + $SIG{__DIE__} = \&_die; $SIG{__WARN__} = \&_logmsg; -warn "freesied-queued starting\n"; +warn "freeside-queued starting\n"; +my $warnkids=0; while (1) { + #prevent runaway forking + if ( $kids >= $max_kids ) { + warn "WARNING: maximum $kids children reached\n" unless $warnkids++; + sleep 1; #waiting for signals is cheap + next; + } + $warnkids=0; + + my $nodepend = driver_name eq 'mysql' + ? '' + : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. + ' WHERE queue_depend.jobnum = queue.jobnum ) '; + + #my($job, $ljob); + #{ + # my $oldAutoCommit = $FS::UID::AutoCommit; + # local $FS::UID::AutoCommit = 0; + $FS::UID::AutoCommit = 0; + my $dbh = dbh; + my $job = qsearchs( 'queue', { 'status' => 'new' }, '', - 'ORDER BY jobnum FOR UPDATE LIMIT 1' + driver_name eq 'mysql' + ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE" + : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1" ) or do { - sleep 5; + $dbh->commit or die $dbh->errstr; #if $oldAutoCommit; + sleep 5; #connecting to db is expensive next; }; + if ( driver_name eq 'mysql' + && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) { + $dbh->commit or die $dbh->errstr; #if $oldAutoCommit; + sleep 5; #would be better if mysql could do everything in query above + next; + } + my %hash = $job->hash; $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); die $error if $error; - my @args = $ljob->args; + $dbh->commit or die $dbh->errstr; #if $oldAutoCommit; + + $FS::UID::AutoCommit = 1; + #} - #fork a child for each job (up to some maximum perhaps?) - #single-threaded for now. + my @args = $ljob->args; - my $eval = "&". $ljob->job. '(@args);'; - warn "running $eval"; - eval $eval; - if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; $hash{'status'} = 'failed'; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); die $error if $error; - } else { - $ljob->delete; + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + #auto-use export classes... + if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + warn "running $eval"; + eval $eval; #throw away return value? suppose so + if ( $@ ) { + warn "job $eval failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + } else { + $ljob->delete; + } + + exit; + #end-of-kid } } continue { @@ -87,6 +172,10 @@ while (1) { } } +sub usage { + die "Usage:\n\n freeside-queued user\n"; +} + sub _die { my $msg = shift; unlink $pid_file if -e $pid_file; @@ -103,7 +192,7 @@ sub _logmsg { close $log; } -sub daemonize { +sub daemonize1 { chdir "/" or die "Can't chdir to /: $!"; open STDIN, '/dev/null' or die "Can't read /dev/null: $!"; @@ -115,11 +204,18 @@ sub daemonize { print $pidfh "$pid\n"; exit; } + #open STDOUT, '>/dev/null' + # or die "Can't write to /dev/null: $!"; + #setsid or die "Can't start a new session: $!"; + #open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; + +} + +sub daemonize2 { open STDOUT, '>/dev/null' or die "Can't write to /dev/null: $!"; setsid or die "Can't start a new session: $!"; open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; - } =head1 NAME