reduce waiting time for -cdrd and -queued, RT#4667
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $DEBUG $kids $max_kids %kids );
5 use POSIX qw(:sys_wait_h);
6 use IO::File;
7 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
8 use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
9 use FS::Record qw(qsearch qsearchs);
10 use FS::queue;
11 use FS::queue_depend;
12
13 # no autoloading for non-FS classes...
14 use Net::SSH 0.07;
15
16 $DEBUG = 0;
17
18 $max_kids = '10'; #guess it should be a config file...
19 $kids = 0;
20
21 my $user = shift or die &usage;
22
23 warn "starting daemonization (forking)\n" if $DEBUG;
24 #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs
25 daemonize1('freeside-queued');
26
27 warn "dropping privledges\n" if $DEBUG;
28 drop_root();
29
30
31 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
32
33 warn "connecting to database\n" if $DEBUG;
34 $@ = 'not connected';
35 while ( $@ ) {
36   eval { adminsuidsetup $user; };
37   if ( $@ ) {
38     warn $@;
39     warn "sleeping for reconnect...\n";
40     sleep 5;
41   }
42 }
43
44 logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc );
45
46 warn "completing daemonization (detaching))\n" if $DEBUG;
47 daemonize2();
48
49 #--
50
51 my $warnkids=0;
52 while (1) {
53
54   &reap_kids;
55   #prevent runaway forking
56   if ( $kids >= $max_kids ) {
57     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
58     &reap_kids;
59     sleep 1; #waiting for signals is cheap
60     next;
61   }
62   $warnkids=0;
63
64   unless ( dbh && dbh->ping ) {
65     warn "WARNING: connection to database lost, reconnecting...\n";
66
67     eval { $FS::UID::dbh = myconnect; };
68
69     unless ( !$@ && dbh && dbh->ping ) {
70       warn "WARNING: still no connection to database, sleeping for retry...\n";
71       sleep 10;
72       next;
73     } else {
74       warn "WARNING: reconnected to database\n";
75     }
76   }
77
78   #my($job, $ljob);
79   #{
80   #  my $oldAutoCommit = $FS::UID::AutoCommit;
81   #  local $FS::UID::AutoCommit = 0;
82   $FS::UID::AutoCommit = 0;
83
84   my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
85                  '           WHERE queue_depend.jobnum = queue.jobnum )';
86
87   my $order_by = "ORDER BY jobnum ". ( driver_name eq 'mysql'
88                                          ? 'LIMIT 1 FOR UPDATE'
89                                          : 'FOR UPDATE LIMIT 1' );
90
91   my $job = qsearchs({
92     'table'     => 'queue',
93     'hashref'   => { 'status' => 'new' },
94     'extra_sql' => $nodepend,
95     'order_by'  => $order_by,
96   }) or do {
97     # if $oldAutoCommit {
98     dbh->commit or do {
99       warn "WARNING: database error, closing connection: ". dbh->errstr;
100       undef $FS::UID::dbh;
101       next;
102     };
103     # }
104     sleep 1;
105     next;
106   };
107
108   my %hash = $job->hash;
109   $hash{'status'} = 'locked';
110   my $ljob = new FS::queue ( \%hash );
111   my $error = $ljob->replace($job);
112   if ( $error ) {
113     warn "WARNING: database error locking job, closing connection: ".
114          dbh->errstr;
115     undef $FS::UID::dbh;
116     next;
117   }
118
119   # if $oldAutoCommit {
120   dbh->commit or do {
121     warn "WARNING: database error, closing connection: ". dbh->errstr;
122     undef $FS::UID::dbh;
123     next;
124   };
125   # }
126
127   $FS::UID::AutoCommit = 1;
128   #} 
129
130   my @args = $ljob->args;
131   splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
132
133   defined( my $pid = fork ) or do {
134     warn "WARNING: can't fork: $!\n";
135     my %hash = $job->hash;
136     $hash{'status'} = 'failed';
137     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
138     my $ljob = new FS::queue ( \%hash );
139     my $error = $ljob->replace($job);
140     die $error if $error;
141     next; #don't increment the kid counter
142   };
143
144   if ( $pid ) {
145     $kids++;
146     $kids{$pid} = 1;
147   } else { #kid time
148
149     #get new db handle
150     $FS::UID::dbh->{InactiveDestroy} = 1;
151
152     forksuidsetup($user);
153
154     #auto-use classes...
155     #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
156     if (    $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/
157          || $ljob->job =~ /(FS::\w+)::/
158        )
159     {
160       my $class = $1;
161       eval "use $class;";
162       if ( $@ ) {
163         warn "job use $class failed";
164         my %hash = $ljob->hash;
165         $hash{'status'} = 'failed';
166         $hash{'statustext'} = $@;
167         my $fjob = new FS::queue( \%hash );
168         my $error = $fjob->replace($ljob);
169         die $error if $error;
170         exit; #end-of-kid
171       };
172     }
173
174     my $eval = "&". $ljob->job. '(@args);';
175     warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
176     eval $eval; #throw away return value?  suppose so
177     if ( $@ ) {
178       warn "job $eval failed";
179       my %hash = $ljob->hash;
180       $hash{'status'} = 'failed';
181       $hash{'statustext'} = $@;
182       my $fjob = new FS::queue( \%hash );
183       my $error = $fjob->replace($ljob);
184       die $error if $error;
185     } else {
186       $ljob->delete;
187     }
188
189     exit;
190     #end-of-kid
191   }
192
193 } continue {
194   if ( sigterm() ) {
195     warn "received TERM signal; exiting\n";
196     exit;
197   }
198   if ( sigint() ) {
199     warn "received INT signal; exiting\n";
200     exit;
201   }
202 }
203
204 sub usage {
205   die "Usage:\n\n  freeside-queued user\n";
206 }
207
208 sub reap_kids {
209   foreach my $pid ( keys %kids ) {
210     my $kid = waitpid($pid, WNOHANG);
211     if ( $kid > 0 ) {
212       $kids--;
213       delete $kids{$kid};
214     }
215   }
216 }
217
218 =head1 NAME
219
220 freeside-queued - Job queue daemon
221
222 =head1 SYNOPSIS
223
224   freeside-queued user
225
226 =head1 DESCRIPTION
227
228 Job queue daemon.  Should be running at all times.
229
230 user: from the mapsecrets file - see config.html from the base documentation
231
232 =head1 VERSION
233
234 =head1 BUGS
235
236 =head1 SEE ALSO
237
238 =cut
239