better retry behavior for non-found taqua CDR rewrites, RT#12181
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $DEBUG $kids $max_kids $sleep_time %kids );
5 use POSIX qw(:sys_wait_h);
6 use IO::File;
7 use Getopt::Std;
8 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
9 use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
10 use FS::Conf;
11 use FS::Record qw(qsearch);
12 use FS::queue;
13 use FS::queue_depend;
14
15 # no autoloading for non-FS classes...
16 use Net::SSH 0.07;
17
18 $DEBUG = 0;
19
20 $kids = 0;
21
22 &untaint_argv;  #what it sounds like  (eww)
23 use vars qw(%opt);
24 getopts('sn', \%opt );
25
26 my $user = shift or die &usage;
27
28 warn "starting daemonization (forking)\n" if $DEBUG;
29 #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs
30 daemonize1('freeside-queued');
31
32 warn "dropping privledges\n" if $DEBUG;
33 drop_root();
34
35 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
36
37 warn "connecting to database\n" if $DEBUG;
38 $@ = 'not connected';
39 while ( $@ ) {
40   eval { adminsuidsetup $user; };
41   if ( $@ ) {
42     warn $@;
43     warn "sleeping for reconnect...\n";
44     sleep 5;
45   }
46 }
47
48 logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc );
49
50 warn "completing daemonization (detaching))\n" if $DEBUG;
51 daemonize2();
52
53 #--
54
55 my $conf = new FS::Conf;
56 $max_kids = $conf->config('queued-max_kids') || 10;
57 $sleep_time = $conf->config('queued-sleep_time') || 10;
58
59 my $warnkids=0;
60 while (1) {
61
62   &reap_kids;
63   #prevent runaway forking
64   if ( $kids >= $max_kids ) {
65     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
66     &reap_kids;
67     sleep 1; #waiting for signals is cheap
68     next;
69   }
70   $warnkids=0;
71
72   unless ( dbh && dbh->ping ) {
73     warn "WARNING: connection to database lost, reconnecting...\n";
74
75     eval { $FS::UID::dbh = myconnect; };
76
77     unless ( !$@ && dbh && dbh->ping ) {
78       warn "WARNING: still no connection to database, sleeping for retry...\n";
79       sleep 10;
80       next;
81     } else {
82       warn "WARNING: reconnected to database\n";
83     }
84   }
85
86   #my($job, $ljob);
87   #{
88   #  my $oldAutoCommit = $FS::UID::AutoCommit;
89   #  local $FS::UID::AutoCommit = 0;
90   $FS::UID::AutoCommit = 0;
91
92   my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'.
93                  '           WHERE queue_depend.jobnum = queue.jobnum )';
94
95   #anything with a priority goes after stuff without one
96   my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC ';
97
98   my $limit = $max_kids - $kids;
99
100   $order_by .= ( driver_name eq 'mysql'
101                    ? " LIMIT $limit FOR UPDATE "
102                    : " FOR UPDATE LIMIT $limit " );
103
104   my $hashref = { 'status' => 'new' };
105   if ( $opt{'s'} ) {
106     $hashref->{'secure'} = 'Y';
107   } elsif ( $opt{'n'} ) {
108     $hashref->{'secure'} = '';
109   }
110
111   #qsearch dies when the db goes away
112   my @jobs = eval {
113     qsearch({
114       'table'     => 'queue',
115       'hashref'   => $hashref,
116       'extra_sql' => $nodepend,
117       'order_by'  => $order_by,
118     });
119   };
120   if ( $@ ) {
121     warn "WARNING: error searching for jobs, closing connection: $@";
122     undef $FS::UID::dbh;
123     next;
124   }
125
126   unless ( @jobs ) {
127     dbh->commit or do {
128       warn "WARNING: database error, closing connection: ". dbh->errstr;
129       undef $FS::UID::dbh;
130       next;
131     };
132     sleep $sleep_time;
133     next;
134   }
135
136   foreach my $job ( @jobs ) {
137
138     my %hash = $job->hash;
139     $hash{'status'} = 'locked';
140     my $ljob = new FS::queue ( \%hash );
141     my $error = $ljob->replace($job);
142     if ( $error ) {
143       warn "WARNING: database error locking job, closing connection: ".
144            dbh->errstr;
145       undef $FS::UID::dbh;
146       next;
147     }
148
149     dbh->commit or do {
150       warn "WARNING: database error, closing connection: ". dbh->errstr;
151       undef $FS::UID::dbh;
152       next;
153     };
154
155     $FS::UID::AutoCommit = 1;
156
157     my @args = eval { $ljob->args; };
158     if ( $@ ) {
159       warn "WARNING: error retrieving job arguments, closing connection: $@";
160       undef $FS::UID::dbh;
161       next;
162     }
163     splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
164
165     defined( my $pid = fork ) or do {
166       warn "WARNING: can't fork: $!\n";
167       my %hash = $job->hash;
168       $hash{'status'} = 'failed';
169       $hash{'statustext'} = "[freeside-queued] can't fork: $!";
170       my $ljob = new FS::queue ( \%hash );
171       my $error = $ljob->replace($job);
172       die $error if $error; #XXX still dying if we can't fork AND we can't connect to the db
173       next; #don't increment the kid counter
174     };
175
176     if ( $pid ) {
177       $kids++;
178       $kids{$pid} = 1;
179     } else { #kid time
180
181       #get new db handle
182       $FS::UID::dbh->{InactiveDestroy} = 1;
183
184       forksuidsetup($user);
185
186       dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile');
187
188       #auto-use classes...
189       if (    $ljob->job =~ /(FS::(part_export|cust_main|cust_pkg)::\w+)::/
190            || $ljob->job =~ /(FS::\w+)::/
191          )
192       {
193         my $class = $1;
194         eval "use $class;";
195         if ( $@ ) {
196           warn "job use $class failed";
197           my %hash = $ljob->hash;
198           $hash{'status'} = 'failed';
199           $hash{'statustext'} = $@;
200           my $fjob = new FS::queue( \%hash );
201           my $error = $fjob->replace($ljob);
202           die $error if $error;
203           exit; #end-of-kid
204         };
205       }
206
207       my $eval = "&". $ljob->job. '(@args);';
208       warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
209       eval $eval; #throw away return value?  suppose so
210       if ( $@ ) {
211         my %hash = $ljob->hash;
212         $hash{'statustext'} = $@;
213         if ( $hash{'statustext'} =~ /\/misc\/queued_report/ ) { #use return?
214           $hash{'status'} = 'done'; 
215         } else {
216           $hash{'status'} = 'failed';
217           warn "job $eval failed";
218         }
219         my $fjob = new FS::queue( \%hash );
220         my $error = $fjob->replace($ljob);
221         die $error if $error;
222       } else {
223         $ljob->delete;
224       }
225
226       if ( UNIVERSAL::can(dbh, 'sprintProfile') ) {
227         open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time)
228           or die "can't open profile file: $!";
229         print PROFILE dbh->sprintProfile();
230         close PROFILE or die "can't close profile file: $!";
231       }
232
233       exit;
234       #end-of-kid
235     }
236
237   } #foreach my $job
238
239 } continue {
240   if ( sigterm() ) {
241     warn "received TERM signal; exiting\n";
242     exit;
243   }
244   if ( sigint() ) {
245     warn "received INT signal; exiting\n";
246     exit;
247   }
248 }
249
250 sub untaint_argv {
251   foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV
252     #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\"";
253     # Date::Parse
254     $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\"";
255     $ARGV[$_]=$1;
256   }
257 }
258
259 sub usage {
260   die "Usage:\n\n  freeside-queued user\n";
261 }
262
263 sub reap_kids {
264   foreach my $pid ( keys %kids ) {
265     my $kid = waitpid($pid, WNOHANG);
266     if ( $kid > 0 ) {
267       $kids--;
268       delete $kids{$kid};
269     }
270   }
271 }
272
273 =head1 NAME
274
275 freeside-queued - Job queue daemon
276
277 =head1 SYNOPSIS
278
279   freeside-queued [ -s | -n ] user
280
281 =head1 DESCRIPTION
282
283 Job queue daemon.  Should be running at all times.
284
285 -s: "secure" jobs only (queued billing jobs)
286
287 -n: non-"secure" jobs only (other jobs)
288
289 user: from the mapsecrets file - see config.html from the base documentation
290
291 =head1 VERSION
292
293 =head1 BUGS
294
295 =head1 SEE ALSO
296
297 =cut
298