67e5e2bca18c6ca55bd4fe9a12019c5b01ce9cbf
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $log_file $sigterm $sigint $kids $max_kids );
5 use subs qw( _die _logmsg );
6 use Fcntl qw(:flock);
7 use POSIX qw(setsid);
8 use Date::Format;
9 use IO::File;
10 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh);
11 use FS::Record qw(qsearchs);
12 use FS::queue;
13 use FS::queue_depend;
14
15 # no autoloading just yet
16 use FS::cust_main;
17 use FS::svc_acct;
18 use Net::SSH 0.05;
19 use FS::part_export;
20
21 $max_kids = '10'; #guess it should be a config file...
22 $kids = 0;
23
24 my $user = shift or die &usage;
25
26 #my $pid_file = "/var/run/freeside-queued.$user.pid";
27 my $pid_file = "/var/run/freeside-queued.pid";
28
29 &daemonize1;
30
31 sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; $kids--; }
32 $SIG{CHLD} =  \&REAPER;
33
34 $sigterm = 0;
35 $sigint = 0;
36 $SIG{INT} = sub { warn "SIGINT received; shutting down\n"; $sigint++; };
37 $SIG{TERM} = sub { warn "SIGTERM received; shutting down\n"; $sigterm++; };
38
39 $> = $FS::UID::freeside_uid unless $>;
40 $< = $>;
41 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
42 adminsuidsetup $user;
43
44 $log_file = "/usr/local/etc/freeside/queuelog.". $FS::UID::datasrc;
45
46 &daemonize2;
47
48 $SIG{__DIE__} = \&_die;
49 $SIG{__WARN__} = \&_logmsg;
50
51 warn "freeside-queued starting\n";
52
53 my $warnkids=0;
54 while (1) {
55
56   #prevent runaway forking
57   if ( $kids >= $max_kids ) {
58     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
59     sleep 1; #waiting for signals is cheap
60     next;
61   }
62   $warnkids=0;
63
64   my $nodepend = driver name eq 'mysql'
65    ? ''
66    : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
67      ' WHERE queue_depend.jobnum = queue.jobnum ) ';
68
69   my($job, $ljob);
70   {
71     my $oldAutoCommit = $FS::UID::AutoCommit;
72     local $FS::UID::AutoCommit = 0;
73     my $dbh = dbh; 
74   
75     $job = qsearchs(
76       'queue',
77       { 'status' => 'new' },
78       '',
79       driver_name eq 'mysql'
80         ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
81         : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
82     ) or do {
83       $dbh->commit or die $dbh->errstr if $oldAutoCommit;
84       sleep 5; #connecting to db is expensive
85       next;
86     };
87
88     if ( driver_name eq 'mysql'
89          && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
90       $dbh->commit or die $dbh->errstr if $oldAutoCommit;
91       next;
92     }
93
94     my %hash = $job->hash;
95     $hash{'status'} = 'locked';
96     $ljob = new FS::queue ( \%hash );
97     my $error = $ljob->replace($job);
98     die $error if $error;
99
100     $dbh->commit or die $dbh->errstr if $oldAutoCommit;
101   } 
102
103   my @args = $ljob->args;
104
105   defined( my $pid = fork ) or do {
106     warn "WARNING: can't fork: $!\n";
107     my %hash = $job->hash;
108     $hash{'status'} = 'failed';
109     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
110     my $ljob = new FS::queue ( \%hash );
111     my $error = $ljob->replace($job);
112     die $error if $error;
113     next; #don't increment the kid counter
114   };
115
116   if ( $pid ) {
117     $kids++;
118   } else { #kid time
119
120     #get new db handle
121     $FS::UID::dbh->{InactiveDestroy} = 1;
122
123     forksuidsetup($user);
124
125     #auto-use export classes...
126     if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
127       my $class = $1;
128       eval "use $class;";
129       if ( $@ ) {
130         warn "job use $class failed";
131         my %hash = $ljob->hash;
132         $hash{'status'} = 'failed';
133         $hash{'statustext'} = $@;
134         my $fjob = new FS::queue( \%hash );
135         my $error = $fjob->replace($ljob);
136         die $error if $error;
137         exit; #end-of-kid
138       };
139     }
140
141     my $eval = "&". $ljob->job. '(@args);';
142     warn "running $eval";
143     eval $eval; #throw away return value?  suppose so
144     if ( $@ ) {
145       warn "job $eval failed";
146       my %hash = $ljob->hash;
147       $hash{'status'} = 'failed';
148       $hash{'statustext'} = $@;
149       my $fjob = new FS::queue( \%hash );
150       my $error = $fjob->replace($ljob);
151       die $error if $error;
152     } else {
153       $ljob->delete;
154     }
155
156     exit;
157     #end-of-kid
158   }
159
160 } continue {
161   if ( $sigterm ) {
162     warn "received TERM signal; exiting\n";
163     exit;
164   }
165   if ( $sigint ) {
166     warn "received INT signal; exiting\n";
167     exit;
168   }
169 }
170
171 sub usage {
172   die "Usage:\n\n  freeside-queued user\n";
173 }
174
175 sub _die {
176   my $msg = shift;
177   unlink $pid_file if -e $pid_file;
178   _logmsg($msg);
179 }
180
181 sub _logmsg {
182   chomp( my $msg = shift );
183   my $log = new IO::File ">>$log_file";
184   flock($log, LOCK_EX);
185   seek($log, 0, 2);
186   print $log "[". time2str("%a %b %e %T %Y",time). "] [$$] $msg\n";
187   flock($log, LOCK_UN);
188   close $log;
189 }
190
191 sub daemonize1 {
192
193   chdir "/" or die "Can't chdir to /: $!";
194   open STDIN, '/dev/null'   or die "Can't read /dev/null: $!";
195   defined(my $pid = fork) or die "Can't fork: $!";
196   if ( $pid ) {
197     print "freeside-queued started with pid $pid\n"; #logging to $log_file\n";
198     exit unless $pid_file;
199     my $pidfh = new IO::File ">$pid_file" or exit;
200     print $pidfh "$pid\n";
201     exit;
202   }
203   #open STDOUT, '>/dev/null'
204   #                          or die "Can't write to /dev/null: $!";
205   #setsid                  or die "Can't start a new session: $!";
206   #open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
207
208 }
209
210 sub daemonize2 {
211   open STDOUT, '>/dev/null'
212                             or die "Can't write to /dev/null: $!";
213   setsid                  or die "Can't start a new session: $!";
214   open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
215 }
216
217 =head1 NAME
218
219 freeside-queued - Job queue daemon
220
221 =head1 SYNOPSIS
222
223   freeside-queued user
224
225 =head1 DESCRIPTION
226
227 Job queue daemon.  Should be running at all times.
228
229 user: from the mapsecrets file - see config.html from the base documentation
230
231 =head1 VERSION
232
233 =head1 BUGS
234
235 =head1 SEE ALSO
236
237 =cut
238