846055dc3f75ad5041b4fed20c30aebac41bbbd1
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $log_file $sigterm $sigint $kids $max_kids );
5 use subs qw( _die _logmsg );
6 use Fcntl qw(:flock);
7 use POSIX qw(setsid);
8 use Date::Format;
9 use IO::File;
10 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh);
11 use FS::Record qw(qsearchs);
12 use FS::queue;
13 use FS::queue_depend;
14
15 # no autoloading just yet
16 use FS::cust_main;
17 use FS::svc_acct;
18 use Net::SSH 0.05;
19 use FS::part_export;
20
21 $max_kids = '10'; #guess it should be a config file...
22 $kids = 0;
23
24 my $user = shift or die &usage;
25
26 #my $pid_file = "/var/run/freeside-queued.$user.pid";
27 my $pid_file = "/var/run/freeside-queued.pid";
28
29 &daemonize1;
30
31 sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; $kids--; }
32 $SIG{CHLD} =  \&REAPER;
33
34 $sigterm = 0;
35 $sigint = 0;
36 $SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
37 $SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
38
39 $> = $FS::UID::freeside_uid unless $>;
40 $< = $>;
41 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
42 adminsuidsetup $user;
43
44 $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
45
46 &daemonize2;
47
48 $SIG{__DIE__} = \&_die;
49 $SIG{__WARN__} = \&_logmsg;
50
51 warn "freeside-queued starting\n";
52
53 my $warnkids=0;
54 while (1) {
55
56   #prevent runaway forking
57   if ( $kids >= $max_kids ) {
58     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
59     sleep 1; #waiting for signals is cheap
60     next;
61   }
62   $warnkids=0;
63
64   my $nodepend = driver_name eq 'mysql'
65    ? ''
66    : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
67      ' WHERE queue_depend.jobnum = queue.jobnum ) ';
68
69   #my($job, $ljob);
70   #{
71   #  my $oldAutoCommit = $FS::UID::AutoCommit;
72   #  local $FS::UID::AutoCommit = 0;
73   $FS::UID::AutoCommit = 0;
74   my $dbh = dbh; 
75   
76   my $job = qsearchs(
77     'queue',
78     { 'status' => 'new' },
79     '',
80     driver_name eq 'mysql'
81       ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
82       : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
83   ) or do {
84     $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
85     sleep 5; #connecting to db is expensive
86     next;
87   };
88
89   if ( driver_name eq 'mysql'
90        && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
91     $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
92     sleep 5; #would be better if mysql could do everything in query above
93     next;
94   }
95
96   my %hash = $job->hash;
97   $hash{'status'} = 'locked';
98   my $ljob = new FS::queue ( \%hash );
99   my $error = $ljob->replace($job);
100   die $error if $error;
101
102   $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
103
104   $FS::UID::AutoCommit = 1;
105   #} 
106
107   my @args = $ljob->args;
108
109   defined( my $pid = fork ) or do {
110     warn "WARNING: can't fork: $!\n";
111     my %hash = $job->hash;
112     $hash{'status'} = 'failed';
113     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
114     my $ljob = new FS::queue ( \%hash );
115     my $error = $ljob->replace($job);
116     die $error if $error;
117     next; #don't increment the kid counter
118   };
119
120   if ( $pid ) {
121     $kids++;
122   } else { #kid time
123
124     #get new db handle
125     $FS::UID::dbh->{InactiveDestroy} = 1;
126
127     forksuidsetup($user);
128
129     #auto-use export classes...
130     if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
131       my $class = $1;
132       eval "use $class;";
133       if ( $@ ) {
134         warn "job use $class failed";
135         my %hash = $ljob->hash;
136         $hash{'status'} = 'failed';
137         $hash{'statustext'} = $@;
138         my $fjob = new FS::queue( \%hash );
139         my $error = $fjob->replace($ljob);
140         die $error if $error;
141         exit; #end-of-kid
142       };
143     }
144
145     my $eval = "&". $ljob->job. '(@args);';
146     warn "running $eval";
147     eval $eval; #throw away return value?  suppose so
148     if ( $@ ) {
149       warn "job $eval failed";
150       my %hash = $ljob->hash;
151       $hash{'status'} = 'failed';
152       $hash{'statustext'} = $@;
153       my $fjob = new FS::queue( \%hash );
154       my $error = $fjob->replace($ljob);
155       die $error if $error;
156     } else {
157       $ljob->delete;
158     }
159
160     exit;
161     #end-of-kid
162   }
163
164 } continue {
165   if ( $sigterm ) {
166     warn "received TERM signal; exiting\n";
167     exit;
168   }
169   if ( $sigint ) {
170     warn "received INT signal; exiting\n";
171     exit;
172   }
173 }
174
175 sub usage {
176   die "Usage:\n\n  freeside-queued user\n";
177 }
178
179 sub _die {
180   my $msg = shift;
181   unlink $pid_file if -e $pid_file;
182   _logmsg($msg);
183 }
184
185 sub _logmsg {
186   chomp( my $msg = shift );
187   my $log = new IO::File ">>$log_file";
188   flock($log, LOCK_EX);
189   seek($log, 0, 2);
190   print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n";
191   flock($log, LOCK_UN);
192   close $log;
193 }
194
195 sub daemonize1 {
196
197   chdir "/" or die "Can't chdir to /: $!";
198   open STDIN, '/dev/null'   or die "Can't read /dev/null: $!";
199   defined(my $pid = fork) or die "Can't fork: $!";
200   if ( $pid ) {
201     print "freeside-queued started with pid $pid\n"; #logging to $log_file\n";
202     exit unless $pid_file;
203     my $pidfh = new IO::File ">$pid_file" or exit;
204     print $pidfh "$pid\n";
205     exit;
206   }
207   #open STDOUT, '>/dev/null'
208   #                          or die "Can't write to /dev/null: $!";
209   #setsid                  or die "Can't start a new session: $!";
210   #open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
211
212 }
213
214 sub daemonize2 {
215   open STDOUT, '>/dev/null'
216                             or die "Can't write to /dev/null: $!";
217   setsid                  or die "Can't start a new session: $!";
218   open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
219 }
220
221 =head1 NAME
222
223 freeside-queued - Job queue daemon
224
225 =head1 SYNOPSIS
226
227   freeside-queued user
228
229 =head1 DESCRIPTION
230
231 Job queue daemon.  Should be running at all times.
232
233 user: from the mapsecrets file - see config.html from the base documentation
234
235 =head1 VERSION
236
237 =head1 BUGS
238
239 =head1 SEE ALSO
240
241 =cut
242