This commit was generated by cvs2svn to compensate for changes in r11022,
[freeside.git] / fs_selfservice / FS-SelfService / freeside-selfservice-clientd
1 #!/usr/bin/perl -w
2 #
3 # freeside-selfservice-clientd
4 #
5 # This is run REMOTELY over ssh by freeside-selfservice-server
6
7 use strict;
8 use subs qw(spawn logmsg lock_write unlock_write);
9 use Fcntl qw(:flock);
10 use POSIX qw(:sys_wait_h);
11 use Socket;
12 use Storable 2.09 qw(nstore_fd fd_retrieve);
13 use IO::Handle qw(_IONBF);
14 use IO::Select;
15 use IO::File;
16 use Text::CSV_XS;
17
18 #STDOUT->setbuf('');
19
20 my $tag = scalar(@ARGV) ? '.'.shift : '';
21
22 use vars qw( $Debug );
23 $Debug = 2; #2 will turn on child logging
24             #3 will log packet contents,#including passwords
25             #4 will log receipts of all packets from server including
26             #  keepalives (big!)
27
28 my $socket = "/usr/local/freeside/selfservice_socket$tag";
29 my $pid_file = "$socket.pid";
30
31 my $log_file = "/usr/local/freeside/selfservice$tag.log";
32
33 my $lock_file = "/usr/local/freeside/selfservice$tag.writelock";
34
35 #my $me = '[client]';
36
37 $|=1;
38
39 $SIG{__WARN__} = \&_logmsg;
40 #$SIG{__DIE__} = sub { &_logmsg(@_); exit };
41
42 #read data to be cached or something
43 #warn "$me Reading init data\n" if $Debug;
44 #my $signup_init = 
45
46 warn "Creating $lock_file\n" if $Debug;
47 open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
48 close LOCKFILE;
49
50 warn "Creating $socket\n" if $Debug;
51 my $uaddr = sockaddr_un($socket);
52 my $proto = getprotobyname('tcp');
53 socket(Server,PF_UNIX,SOCK_STREAM,0) or die "socket: $!";
54 unlink($socket);
55 bind(Server, $uaddr) or die "bind: $!";
56 listen(Server,SOMAXCONN) or die "listen: $!";
57
58 if ( -e $pid_file ) {
59   open(PIDFILE,"<$pid_file");
60   my $old_pid = <PIDFILE>;
61   close PIDFILE;
62   if ( $old_pid =~ /^(\d+)$/ ) {
63     kill 'TERM', $1;
64   }
65 }
66 open(PIDFILE,">$pid_file");
67 print PIDFILE "$$\n";
68 close PIDFILE;
69
70 #my $waitedpid;
71 #sub REAPER { $waitedpid = wait; $SIG{CHLD} = \&REAPER; }
72 #$SIG{CHLD} =  \&REAPER;
73
74 warn "enabling keep alives\n" if $Debug;
75 nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT );
76
77 warn "entering main loop\n" if $Debug;
78
79 my %kids;
80 my %ftp_scan_dir;
81 my %ftp_scan_map;
82
83 my $s = new IO::Select;
84 $s->add(\*STDIN);
85 $s->add(\*Server);
86
87 #for ( $waitedpid = 0;
88 #      accept(Client,Server) || $waitedpid;
89 #      $waitedpid = 0, close Client)
90 #{
91 #  next if $waitedpid;
92
93 #$SIG{PIPE} = sub { warn "SIGPIPE received" };
94 #$SIG{CHLD} = sub { warn "SIGCHLD received" };
95
96 #sub REAPER { warn "SIGCHLD received"; my $pid = wait; $SIG{CHLD} = \&REAPER; }
97 #sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; }
98 #sub REAPER { my $pid = wait; delete $kids{$pid}; $SIG{CHLD} = \&REAPER; }
99 #$SIG{CHLD} =  \&REAPER;
100
101 my $undisp = 0;
102 while (1) {
103
104   &reap_kids;
105
106   warn "waiting for connection\n" if $Debug && !$undisp;
107
108   #my @handles = $s->can_read();
109   my @handles = $s->can_read(5);
110   $undisp = !scalar(@handles);
111   foreach my $handle ( @handles ) {
112
113     if ( $handle == \*STDIN ) {
114
115       warn "receiving packet from server\n" if $Debug > 3;
116
117       my $packet = fd_retrieve(\*STDIN);
118       my $token = $packet->{'_token'};
119
120       if ( $token eq '_keepalive' ) {
121         $undisp = 1;
122         next;
123       }
124
125       warn "received packet from server with token $token\n".
126            ( $Debug > 2
127              ? join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
128              : '' )
129         if $Debug;
130
131       if ( $token eq '_ftp_scan' ) {
132         if ( $ftp_scan_dir{$packet->{dir}} ) {
133           warn "already processing ". $packet->{dir}. "\n" if $Debug;
134         } else {
135           $ftp_scan_dir{$packet->{dir}} = 1;
136           spawn \&ftp_scan, $packet;
137         }
138         $undisp = 1;
139         next;
140       }
141
142       if ( exists($kids{$token}) ) {
143         warn "sending return packet to $token via $kids{$token}\n"
144           if $Debug;
145         nstore_fd($packet, $kids{$token});
146         warn "flushing to $token\n" if $Debug;
147         until ( $kids{$token}->flush ) {
148           warn "WARNING: error flushing: $!";
149           sleep 1;
150         }
151         #no close or delete here - will block waiting for child
152         warn "done with $token\n" if $Debug;
153       } else {
154         warn "WARNING: unknown token $token, discarding message";
155       }
156
157     } elsif ( $handle == \*Server ) {
158
159       until ( accept(Client, Server) ) {
160         warn "WARNING: accept failed: $!";
161         next;
162       }
163
164       warn "received local connection; forking\n" if $Debug;
165
166       spawn sub { #child
167         warn "[child-$$] reading packet from local client" if $Debug > 1;
168         my $packet = fd_retrieve(\*Client);
169         warn "[child-$$] packet received:\n".
170              join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
171           if $Debug > 2;
172         my $command = $packet->{'command'};
173         #handle some commands weirdly?
174         $packet->{_token}=$$;
175
176         my $rv = send_and_wait( $packet );
177
178         warn "[child-$$] closing write stream\n" if $Debug > 1;
179         close STDOUT or die "FATAL: can't close write stream: $!"; #??!
180
181         #close STDIN;
182
183         warn "[child-$$] sending response to local client" if $Debug > 1;
184         nstore_fd($rv, \*Client);
185         Client->flush or die "FATAL: can't flush to local client: $!";
186         close Client or die "FATAL: can't close connection to local client: $!";
187
188         warn "[child-$$] child exiting" if $Debug > 1;
189         exit;
190
191       }; #eo child
192
193       #close Client;
194
195     } else {
196       die "wtf?  $handle";
197     }
198
199   }
200   
201 }
202
203 sub reap_kids {
204   #warn "reaping kids\n";
205   foreach my $pid ( keys %kids ) {
206     my $kid = waitpid($pid, WNOHANG);
207     if ( $kid > 0 ) {
208       close $kids{$kid};
209       delete $kids{$kid};
210       if ( $ftp_scan_map{$kid} ) {
211         delete($ftp_scan_dir{$ftp_scan_map{$kid}});
212         delete($ftp_scan_map{$kid});
213       }
214     }
215   }
216   #warn "done reaping\n";
217 }
218
219 sub spawn {
220     my ( $coderef, $packet ) = ( shift, shift );
221
222     unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
223         use Carp;
224         confess "usage: spawn CODEREF";
225     }
226
227     my $pid;
228     #if (!defined($pid = fork)) {
229     my $kid = new IO::Handle;
230     if (!defined($pid = open($kid, '|-'))) {
231         warn "WARNING: cannot fork: $!";
232         return;
233     } elsif ($pid) {
234         warn "begat $pid" if $Debug;
235         $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan;
236         $kids{$pid} = $kid;
237         #$kids{$pid}->autoflush;
238         return; # I'm the parent
239     }
240     # else I'm the child -- go spawn
241
242 #    open(STDIN,  "<&Client")   || die "can't dup client to stdin";
243 #    open(STDOUT, ">&Client")   || die "can't dup client to stdout";
244 #     open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
245     exit &$coderef($packet);
246 }
247
248 sub _logmsg {
249   chomp( my $msg = shift );
250   my $log = new IO::File ">>$log_file";
251   die "can't open $log_file: $!" unless defined($log);
252   flock($log, LOCK_EX);
253   seek($log, 0, 2);
254   print $log "[client] [". scalar(localtime). "] [$$] $msg\n";
255   flock($log, LOCK_UN);
256   close $log;
257 }
258
259 sub send_and_wait {
260   my $packet = shift;
261
262   warn "[child-$$] locking write stream\n" if $Debug > 1;
263   lock_write;
264
265   warn "[child-$$] sending packet to remote server\n" if $Debug > 1;
266   nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
267         
268   warn "[child-$$] flushing write stream\n" if $Debug > 1;
269   STDOUT->flush or die "FATAL: can't flush: $!";
270         
271   warn "[child-$$] releasing write lock\n" if $Debug > 1;
272   unlock_write;
273
274   warn "[child-$$] waiting for response from parent\n" if $Debug > 1;
275   my $w = new IO::Select;
276   $w->add(\*STDIN);
277   until ( $w->can_read ) {
278     warn "[child-$$] WARNING: interrupted select: $!\n";
279   }
280
281   fd_retrieve(\*STDIN);
282 }
283
284 sub lock_write {
285   #broken on freebsd?
286   #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
287
288   #open a new one for each kid to get a unique lock
289   open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
290
291   flock(LOCKFILE, LOCK_EX) or die "FATAL: can't lock $lock_file: $!";
292 }
293
294 sub unlock_write {
295   #broken on freebsd?
296   #flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
297
298   flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!";
299 }
300
301 sub ftp_scan {
302   my $packet = shift;
303
304   warn "[child-$$] performing ftp scan" if $Debug > 1;
305
306   warn "[child-$$] packet received:\n".
307        join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
308     if $Debug > 2;
309
310   $packet->{_token}=$$;
311
312   my $dir;
313   $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves
314   opendir(DIR, $dir) or die "failed to open directory $dir: $!\n";
315   my @files = grep(/\.csv$/, readdir(DIR));
316   closedir(DIR);
317
318   foreach my $file ( @files ) {
319     warn "Processing $file ...\n";
320     my $csv = Text::CSV_XS->new();
321     my $err = "";
322     my @records = ();
323     open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n";
324     open(RESULT, ">$dir/result/$file")
325       or die "can't open result file for $file: $!\n";
326
327     while (<CSV>) {
328       if ( $csv->parse($_) ) {
329         my @columns = $csv->fields();
330         push(@records, \@columns);
331       } else {
332         $err = $csv->error_input;
333         last;
334       }
335     }
336     close(CSV);
337     if ( $err ) {
338       rename("$dir/$file", "$dir/rejected/$file");
339     } else {
340       foreach my $record ( @records ) {
341
342         $packet->{row} = $record;
343         $packet->{_packet} = 'Bulk/processrow';
344         my $result = send_and_wait( $packet );
345
346         if ( $result->{error} ) {
347           my $name;
348           $record->[1] =~ /^(\w+)$/ && ( $name = $1 );
349
350           if ($name) {
351             my $filename = "$dir/rejected/$name";
352             open(REC, ">$filename") or die "can't open $filename: $!\n";
353             print REC join(',', @$record);
354             close REC or die $!;
355             open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n";
356             print ERR $result->{error};
357             close ERR or die $!;
358           }else{
359             warn "bad agent_custid";
360           }
361
362         }
363         print RESULT $result->{message}, "\n";
364       }
365
366       rename("$dir/$file", "$dir/processed/$file");
367       warn "$file processed.\n" if $Debug;
368     }
369     close(RESULT);
370   }
371
372   close STDOUT or die "FATAL: can't close write stream: $!"; #??!
373
374   warn "[child-$$] child exiting" if $Debug > 1;
375   exit;
376
377 }