0be3d9d7a55b51a78e148107ed0e29a2315bf6b5
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $log_file $sigterm $sigint $kids $max_kids %kids );
5 use subs qw( _die _logmsg );
6 use Fcntl qw(:flock);
7 use POSIX qw(:sys_wait_h setsid);
8 use Date::Format;
9 use IO::File;
10 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
11 use FS::Record qw(qsearch qsearchs);
12 use FS::queue;
13 use FS::queue_depend;
14
15 # no autoloading just yet
16 use FS::cust_main;
17 use FS::svc_acct;
18 use Net::SSH 0.07;
19 use FS::part_export;
20
21 $max_kids = '10'; #guess it should be a config file...
22 $kids = 0;
23
24 my $user = shift or die &usage;
25
26 #my $pid_file = "/var/run/freeside-queued.$user.pid";
27 my $pid_file = "/var/run/freeside-queued.pid";
28
29 &daemonize1;
30
31 #sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; $kids--; }
32 #$SIG{CHLD} =  \&REAPER;
33
34 $sigterm = 0;
35 $sigint = 0;
36 $SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
37 $SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
38
39 my $freeside_gid = scalar(getgrnam('freeside'))
40   or die "can't setgid to freeside group\n";
41 $) = $freeside_gid;
42 $( = $freeside_gid;
43 #if freebsd can't setuid(), presumably it can't setgid() either.  grr fleabsd
44 ($(,$)) = ($),$();
45 $) = $freeside_gid;
46
47 $> = $FS::UID::freeside_uid;
48 $< = $FS::UID::freeside_uid;
49 #freebsd is sofa king broken, won't setuid()
50 ($<,$>) = ($>,$<);
51 $> = $FS::UID::freeside_uid;
52
53 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
54 adminsuidsetup $user;
55
56 $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
57
58 &daemonize2;
59
60 $SIG{__DIE__} = \&_die;
61 $SIG{__WARN__} = \&_logmsg;
62
63 warn "freeside-queued starting\n";
64
65 my $warnkids=0;
66 while (1) {
67
68   &reap_kids;
69   #prevent runaway forking
70   if ( $kids >= $max_kids ) {
71     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
72     &reap_kids;
73     sleep 1; #waiting for signals is cheap
74     next;
75   }
76   $warnkids=0;
77
78   my $dbh = dbh;
79   unless ( $dbh->ping ) {
80     warn "WARNING: connection to database lost, reconnecting...\n";
81     myconnect;
82     unless ( $dbh->ping ) {
83       warn "WARNING: still no connection to database, sleeping for retry...\n";
84       sleep 10;
85       next;
86     }
87   }
88
89   #my($job, $ljob);
90   #{
91   #  my $oldAutoCommit = $FS::UID::AutoCommit;
92   #  local $FS::UID::AutoCommit = 0;
93   $FS::UID::AutoCommit = 0;
94   
95   my $nodepend = driver_name eq 'mysql'
96    ? ''
97    : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
98      ' WHERE queue_depend.jobnum = queue.jobnum ) ';
99   my $job = qsearchs(
100     'queue',
101     { 'status' => 'new' },
102     '',
103     driver_name eq 'mysql'
104       ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
105       : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
106   ) or do {
107     $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
108     sleep 5; #connecting to db is expensive
109     next;
110   };
111
112   if ( driver_name eq 'mysql'
113        && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
114     $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
115     sleep 5; #would be better if mysql could do everything in query above
116     next;
117   }
118
119   my %hash = $job->hash;
120   $hash{'status'} = 'locked';
121   my $ljob = new FS::queue ( \%hash );
122   my $error = $ljob->replace($job);
123   die $error if $error;
124
125   $dbh->commit or die $dbh->errstr; #if $oldAutoCommit;
126
127   $FS::UID::AutoCommit = 1;
128   #} 
129
130   my @args = $ljob->args;
131
132   defined( my $pid = fork ) or do {
133     warn "WARNING: can't fork: $!\n";
134     my %hash = $job->hash;
135     $hash{'status'} = 'failed';
136     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
137     my $ljob = new FS::queue ( \%hash );
138     my $error = $ljob->replace($job);
139     die $error if $error;
140     next; #don't increment the kid counter
141   };
142
143   if ( $pid ) {
144     $kids++;
145     $kids{$pid} = 1;
146   } else { #kid time
147
148     #get new db handle
149     $FS::UID::dbh->{InactiveDestroy} = 1;
150
151     forksuidsetup($user);
152
153     #auto-use export classes...
154     if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
155       my $class = $1;
156       eval "use $class;";
157       if ( $@ ) {
158         warn "job use $class failed";
159         my %hash = $ljob->hash;
160         $hash{'status'} = 'failed';
161         $hash{'statustext'} = $@;
162         my $fjob = new FS::queue( \%hash );
163         my $error = $fjob->replace($ljob);
164         die $error if $error;
165         exit; #end-of-kid
166       };
167     }
168
169     my $eval = "&". $ljob->job. '(@args);';
170     warn "running $eval";
171     eval $eval; #throw away return value?  suppose so
172     if ( $@ ) {
173       warn "job $eval failed";
174       my %hash = $ljob->hash;
175       $hash{'status'} = 'failed';
176       $hash{'statustext'} = $@;
177       my $fjob = new FS::queue( \%hash );
178       my $error = $fjob->replace($ljob);
179       die $error if $error;
180     } else {
181       $ljob->delete;
182     }
183
184     exit;
185     #end-of-kid
186   }
187
188 } continue {
189   if ( $sigterm ) {
190     warn "received TERM signal; exiting\n";
191     exit;
192   }
193   if ( $sigint ) {
194     warn "received INT signal; exiting\n";
195     exit;
196   }
197 }
198
199 sub usage {
200   die "Usage:\n\n  freeside-queued user\n";
201 }
202
203 sub _die {
204   my $msg = shift;
205   unlink $pid_file if -e $pid_file;
206   _logmsg($msg);
207 }
208
209 sub _logmsg {
210   chomp( my $msg = shift );
211   my $log = new IO::File ">>$log_file";
212   flock($log, LOCK_EX);
213   seek($log, 0, 2);
214   print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n";
215   flock($log, LOCK_UN);
216   close $log;
217 }
218
219 sub daemonize1 {
220
221   chdir "/" or die "Can't chdir to /: $!";
222   open STDIN, '/dev/null'   or die "Can't read /dev/null: $!";
223   defined(my $pid = fork) or die "Can't fork: $!";
224   if ( $pid ) {
225     print "freeside-queued started with pid $pid\n"; #logging to $log_file\n";
226     exit unless $pid_file;
227     my $pidfh = new IO::File ">$pid_file" or exit;
228     print $pidfh "$pid\n";
229     exit;
230   }
231   #open STDOUT, '>/dev/null'
232   #                          or die "Can't write to /dev/null: $!";
233   #setsid                  or die "Can't start a new session: $!";
234   #open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
235
236 }
237
238 sub daemonize2 {
239   open STDOUT, '>/dev/null'
240                             or die "Can't write to /dev/null: $!";
241   setsid                  or die "Can't start a new session: $!";
242   open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
243 }
244
245 sub reap_kids {
246   foreach my $pid ( keys %kids ) {
247     my $kid = waitpid($pid, WNOHANG);
248     if ( $kid > 0 ) {
249       $kids--;
250       delete $kids{$kid};
251     }
252   }
253 }
254
255 =head1 NAME
256
257 freeside-queued - Job queue daemon
258
259 =head1 SYNOPSIS
260
261   freeside-queued user
262
263 =head1 DESCRIPTION
264
265 Job queue daemon.  Should be running at all times.
266
267 user: from the mapsecrets file - see config.html from the base documentation
268
269 =head1 VERSION
270
271 =head1 BUGS
272
273 =head1 SEE ALSO
274
275 =cut
276