add priority to job queue so billing jobs don't don't drown out provisioning jobs
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $DEBUG $kids $max_kids %kids );
5 use POSIX qw(:sys_wait_h);
6 use IO::File;
7 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
8 use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
9 use FS::Record qw(qsearch qsearchs);
10 use FS::queue;
11 use FS::queue_depend;
12
13 # no autoloading for non-FS classes...
14 use Net::SSH 0.07;
15
16 $DEBUG = 0;
17
18 $max_kids = '10'; #guess it should be a config file...
19 $kids = 0;
20
21 my $user = shift or die &usage;
22
23 warn "starting daemonization (forking)\n" if $DEBUG;
24 #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs
25 daemonize1('freeside-queued');
26
27 warn "dropping privledges\n" if $DEBUG;
28 drop_root();
29
30
31 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
32
33 warn "connecting to database\n" if $DEBUG;
34 $@ = 'not connected';
35 while ( $@ ) {
36   eval { adminsuidsetup $user; };
37   if ( $@ ) {
38     warn $@;
39     warn "sleeping for reconnect...\n";
40     sleep 5;
41   }
42 }
43
44 logfile( "%%%FREESIDE_LOG%%%/queuelog.". $FS::UID::datasrc );
45
46 warn "completing daemonization (detaching))\n" if $DEBUG;
47 daemonize2();
48
49 #--
50
51 my $warnkids=0;
52 while (1) {
53
54   &reap_kids;
55   #prevent runaway forking
56   if ( $kids >= $max_kids ) {
57     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
58     &reap_kids;
59     sleep 1; #waiting for signals is cheap
60     next;
61   }
62   $warnkids=0;
63
64   unless ( dbh && dbh->ping ) {
65     warn "WARNING: connection to database lost, reconnecting...\n";
66
67     eval { $FS::UID::dbh = myconnect; };
68
69     unless ( !$@ && dbh && dbh->ping ) {
70       warn "WARNING: still no connection to database, sleeping for retry...\n";
71       sleep 10;
72       next;
73     } else {
74       warn "WARNING: reconnected to database\n";
75     }
76   }
77
78   #my($job, $ljob);
79   #{
80   #  my $oldAutoCommit = $FS::UID::AutoCommit;
81   #  local $FS::UID::AutoCommit = 0;
82   $FS::UID::AutoCommit = 0;
83
84   my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
85                  '           WHERE queue_depend.jobnum = queue.jobnum )';
86
87   #anything with a priority goes after stuff without one
88   my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC ';
89
90   $order_by .= ( driver_name eq 'mysql'
91                    ? ' LIMIT 1 FOR UPDATE '
92                    : ' FOR UPDATE LIMIT 1 ' );
93
94   my $job = qsearchs({
95     'table'     => 'queue',
96     'hashref'   => { 'status' => 'new' },
97     'extra_sql' => $nodepend,
98     'order_by'  => $order_by,
99   }) or do {
100     # if $oldAutoCommit {
101     dbh->commit or do {
102       warn "WARNING: database error, closing connection: ". dbh->errstr;
103       undef $FS::UID::dbh;
104       next;
105     };
106     # }
107     sleep 1;
108     next;
109   };
110
111   my %hash = $job->hash;
112   $hash{'status'} = 'locked';
113   my $ljob = new FS::queue ( \%hash );
114   my $error = $ljob->replace($job);
115   if ( $error ) {
116     warn "WARNING: database error locking job, closing connection: ".
117          dbh->errstr;
118     undef $FS::UID::dbh;
119     next;
120   }
121
122   # if $oldAutoCommit {
123   dbh->commit or do {
124     warn "WARNING: database error, closing connection: ". dbh->errstr;
125     undef $FS::UID::dbh;
126     next;
127   };
128   # }
129
130   $FS::UID::AutoCommit = 1;
131   #} 
132
133   my @args = $ljob->args;
134   splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
135
136   defined( my $pid = fork ) or do {
137     warn "WARNING: can't fork: $!\n";
138     my %hash = $job->hash;
139     $hash{'status'} = 'failed';
140     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
141     my $ljob = new FS::queue ( \%hash );
142     my $error = $ljob->replace($job);
143     die $error if $error;
144     next; #don't increment the kid counter
145   };
146
147   if ( $pid ) {
148     $kids++;
149     $kids{$pid} = 1;
150   } else { #kid time
151
152     #get new db handle
153     $FS::UID::dbh->{InactiveDestroy} = 1;
154
155     forksuidsetup($user);
156
157     #auto-use classes...
158     #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
159     if (    $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/
160          || $ljob->job =~ /(FS::\w+)::/
161        )
162     {
163       my $class = $1;
164       eval "use $class;";
165       if ( $@ ) {
166         warn "job use $class failed";
167         my %hash = $ljob->hash;
168         $hash{'status'} = 'failed';
169         $hash{'statustext'} = $@;
170         my $fjob = new FS::queue( \%hash );
171         my $error = $fjob->replace($ljob);
172         die $error if $error;
173         exit; #end-of-kid
174       };
175     }
176
177     my $eval = "&". $ljob->job. '(@args);';
178     warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
179     eval $eval; #throw away return value?  suppose so
180     if ( $@ ) {
181       warn "job $eval failed";
182       my %hash = $ljob->hash;
183       $hash{'status'} = 'failed';
184       $hash{'statustext'} = $@;
185       my $fjob = new FS::queue( \%hash );
186       my $error = $fjob->replace($ljob);
187       die $error if $error;
188     } else {
189       $ljob->delete;
190     }
191
192     exit;
193     #end-of-kid
194   }
195
196 } continue {
197   if ( sigterm() ) {
198     warn "received TERM signal; exiting\n";
199     exit;
200   }
201   if ( sigint() ) {
202     warn "received INT signal; exiting\n";
203     exit;
204   }
205 }
206
207 sub usage {
208   die "Usage:\n\n  freeside-queued user\n";
209 }
210
211 sub reap_kids {
212   foreach my $pid ( keys %kids ) {
213     my $kid = waitpid($pid, WNOHANG);
214     if ( $kid > 0 ) {
215       $kids--;
216       delete $kids{$kid};
217     }
218   }
219 }
220
221 =head1 NAME
222
223 freeside-queued - Job queue daemon
224
225 =head1 SYNOPSIS
226
227   freeside-queued user
228
229 =head1 DESCRIPTION
230
231 Job queue daemon.  Should be running at all times.
232
233 user: from the mapsecrets file - see config.html from the base documentation
234
235 =head1 VERSION
236
237 =head1 BUGS
238
239 =head1 SEE ALSO
240
241 =cut
242