make the config directory configurable
[freeside.git] / FS / bin / freeside-queued
1 #!/usr/bin/perl -w
2
3 use strict;
4 use vars qw( $DEBUG $kids $max_kids %kids );
5 use POSIX qw(:sys_wait_h);
6 use IO::File;
7 use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect);
8 use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm);
9 use FS::Record qw(qsearch qsearchs);
10 use FS::queue;
11 use FS::queue_depend;
12
13 # no autoloading for non-FS classes...
14 use Net::SSH 0.07;
15
16 $DEBUG = 0;
17
18 $max_kids = '10'; #guess it should be a config file...
19 $kids = 0;
20
21 my $user = shift or die &usage;
22
23 warn "starting daemonization (forking)\n" if $DEBUG;
24 #daemonize1('freeside-queued',$user); #to keep pid files unique w/multi installs
25 daemonize1('freeside-queued');
26
27 warn "dropping privledges\n" if $DEBUG;
28 drop_root();
29
30
31 $ENV{HOME} = (getpwuid($>))[7]; #for ssh
32
33 warn "connecting to database\n" if $DEBUG;
34 $@ = 'not connected';
35 while ( $@ ) {
36   eval { adminsuidsetup $user; };
37   if ( $@ ) {
38     warn $@;
39     warn "sleeping for reconnect...\n";
40     sleep 5;
41   }
42 }
43
44 logfile( "%%%FREESIDE_CONF%%%/queuelog.". $FS::UID::datasrc );
45
46 warn "completing daemonization (detaching))\n" if $DEBUG;
47 daemonize2();
48
49 #--
50
51 my $warnkids=0;
52 while (1) {
53
54   &reap_kids;
55   #prevent runaway forking
56   if ( $kids >= $max_kids ) {
57     warn "WARNING: maximum $kids children reached\n" unless $warnkids++;
58     &reap_kids;
59     sleep 1; #waiting for signals is cheap
60     next;
61   }
62   $warnkids=0;
63
64   unless ( dbh && dbh->ping ) {
65     warn "WARNING: connection to database lost, reconnecting...\n";
66
67     eval { $FS::UID::dbh = myconnect; };
68
69     unless ( !$@ && dbh && dbh->ping ) {
70       warn "WARNING: still no connection to database, sleeping for retry...\n";
71       sleep 10;
72       next;
73     } else {
74       warn "WARNING: reconnected to database\n";
75     }
76   }
77
78   #my($job, $ljob);
79   #{
80   #  my $oldAutoCommit = $FS::UID::AutoCommit;
81   #  local $FS::UID::AutoCommit = 0;
82   $FS::UID::AutoCommit = 0;
83
84   #assuming mysql 4.1 w/subqueries now
85   #my $nodepend = driver_name eq 'mysql'
86   # ? ''
87   # : 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
88   #   ' WHERE queue_depend.jobnum = queue.jobnum ) ';
89   my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'.
90                  '           WHERE queue_depend.jobnum = queue.jobnum ) ';
91
92   my $job = qsearchs(
93     'queue',
94     { 'status' => 'new' },
95     '',
96     driver_name eq 'mysql'
97       ? "$nodepend ORDER BY jobnum LIMIT 1 FOR UPDATE"
98       : "$nodepend ORDER BY jobnum FOR UPDATE LIMIT 1"
99   ) or do {
100     # if $oldAutoCommit {
101     dbh->commit or do {
102       warn "WARNING: database error, closing connection: ". dbh->errstr;
103       undef $FS::UID::dbh;
104       next;
105     };
106     # }
107     sleep 5; #connecting to db is expensive
108     next;
109   };
110
111   #assuming mysql 4.1 w/subqueries now
112   #if ( driver_name eq 'mysql'
113   #     && qsearch('queue_depend', { 'jobnum' => $job->jobnum } ) ) {
114   #  dbh->commit or die dbh->errstr; #if $oldAutoCommit;
115   #  sleep 5; #would be better if mysql could do everything in query above
116   #  next;
117   #}
118
119   my %hash = $job->hash;
120   $hash{'status'} = 'locked';
121   my $ljob = new FS::queue ( \%hash );
122   my $error = $ljob->replace($job);
123   if ( $error ) {
124     warn "WARNING: database error locking job, closing connection: ".
125          dbh->errstr;
126     undef $FS::UID::dbh;
127     next;
128   }
129
130   # if $oldAutoCommit {
131   dbh->commit or do {
132     warn "WARNING: database error, closing connection: ". dbh->errstr;
133     undef $FS::UID::dbh;
134     next;
135   };
136   # }
137
138   $FS::UID::AutoCommit = 1;
139   #} 
140
141   my @args = $ljob->args;
142   splice @args, 0, 1, $ljob if $args[0] eq '_JOB';
143
144   defined( my $pid = fork ) or do {
145     warn "WARNING: can't fork: $!\n";
146     my %hash = $job->hash;
147     $hash{'status'} = 'failed';
148     $hash{'statustext'} = "[freeside-queued] can't fork: $!";
149     my $ljob = new FS::queue ( \%hash );
150     my $error = $ljob->replace($job);
151     die $error if $error;
152     next; #don't increment the kid counter
153   };
154
155   if ( $pid ) {
156     $kids++;
157     $kids{$pid} = 1;
158   } else { #kid time
159
160     #get new db handle
161     $FS::UID::dbh->{InactiveDestroy} = 1;
162
163     forksuidsetup($user);
164
165     #auto-use classes...
166     #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) {
167     if (    $ljob->job =~ /(FS::part_export::\w+)::/
168          || $ljob->job =~ /(FS::\w+)::/
169        )
170     {
171       my $class = $1;
172       eval "use $class;";
173       if ( $@ ) {
174         warn "job use $class failed";
175         my %hash = $ljob->hash;
176         $hash{'status'} = 'failed';
177         $hash{'statustext'} = $@;
178         my $fjob = new FS::queue( \%hash );
179         my $error = $fjob->replace($ljob);
180         die $error if $error;
181         exit; #end-of-kid
182       };
183     }
184
185     my $eval = "&". $ljob->job. '(@args);';
186     warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG;
187     eval $eval; #throw away return value?  suppose so
188     if ( $@ ) {
189       warn "job $eval failed";
190       my %hash = $ljob->hash;
191       $hash{'status'} = 'failed';
192       $hash{'statustext'} = $@;
193       my $fjob = new FS::queue( \%hash );
194       my $error = $fjob->replace($ljob);
195       die $error if $error;
196     } else {
197       $ljob->delete;
198     }
199
200     exit;
201     #end-of-kid
202   }
203
204 } continue {
205   if ( sigterm() ) {
206     warn "received TERM signal; exiting\n";
207     exit;
208   }
209   if ( sigint() ) {
210     warn "received INT signal; exiting\n";
211     exit;
212   }
213 }
214
215 sub usage {
216   die "Usage:\n\n  freeside-queued user\n";
217 }
218
219 sub reap_kids {
220   foreach my $pid ( keys %kids ) {
221     my $kid = waitpid($pid, WNOHANG);
222     if ( $kid > 0 ) {
223       $kids--;
224       delete $kids{$kid};
225     }
226   }
227 }
228
229 =head1 NAME
230
231 freeside-queued - Job queue daemon
232
233 =head1 SYNOPSIS
234
235   freeside-queued user
236
237 =head1 DESCRIPTION
238
239 Job queue daemon.  Should be running at all times.
240
241 user: from the mapsecrets file - see config.html from the base documentation
242
243 =head1 VERSION
244
245 =head1 BUGS
246
247 =head1 SEE ALSO
248
249 =cut
250