RT# 81961 Repair broken links in POD documentation
[freeside.git] / fs_selfservice / FS-SelfService / freeside-selfservice-clientd
1 #!/usr/bin/perl -w
2
3 =head1 NAME
4
5 freeside-selfservice-clientd
6
7 =head1 DESCRIPTION
8
9 This is run REMOTELY over ssh by freeside-selfservice-server
10
11 =cut
12
13 use strict;
14 use subs qw(spawn logmsg lock_write unlock_write);
15 use Fcntl qw(:flock);
16 use POSIX qw(:sys_wait_h);
17 use Socket;
18 use Storable 2.09 qw(nstore_fd fd_retrieve);
19 use IO::Handle qw(_IONBF);
20 use IO::Select;
21 use IO::File;
22 use Text::CSV_XS;
23
24 #STDOUT->setbuf('');
25
26 my $tag = scalar(@ARGV) ? '.'.shift : '';
27
28 use vars qw( $Debug );
29 $Debug = 2; #2 will turn on child logging
30             #3 will log packet contents,#including passwords
31             #4 will log receipts of all packets from server including
32             #  keepalives (big!)
33
34 my $socket = "/usr/local/freeside/selfservice_socket$tag";
35 my $pid_file = "$socket.pid";
36
37 my $log_file = "/usr/local/freeside/selfservice$tag.log";
38
39 my $lock_file = "/usr/local/freeside/selfservice$tag.writelock";
40
41 #my $me = '[client]';
42
43 $|=1;
44
45 $SIG{__WARN__} = \&_logmsg;
46 #$SIG{__DIE__} = sub { &_logmsg(@_); exit };
47
48 #read data to be cached or something
49 #warn "$me Reading init data\n" if $Debug;
50 #my $signup_init = 
51
52 warn "Creating $lock_file\n" if $Debug;
53 open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
54 close LOCKFILE;
55
56 warn "Creating $socket\n" if $Debug;
57 my $uaddr = sockaddr_un($socket);
58 my $proto = getprotobyname('tcp');
59 socket(Server,PF_UNIX,SOCK_STREAM,0) or die "socket: $!";
60 unlink($socket);
61 bind(Server, $uaddr) or die "bind: $!";
62 listen(Server,SOMAXCONN) or die "listen: $!";
63
64 if ( -e $pid_file ) {
65   open(PIDFILE,"<$pid_file");
66   my $old_pid = <PIDFILE>;
67   close PIDFILE;
68   if ( $old_pid =~ /^(\d+)$/ ) {
69     kill 'TERM', $1;
70   }
71 }
72 open(PIDFILE,">$pid_file");
73 print PIDFILE "$$\n";
74 close PIDFILE;
75
76 #my $waitedpid;
77 #sub REAPER { $waitedpid = wait; $SIG{CHLD} = \&REAPER; }
78 #$SIG{CHLD} =  \&REAPER;
79
80 warn "enabling keep alives\n" if $Debug;
81 nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT );
82
83 warn "entering main loop\n" if $Debug;
84
85 my %kids;
86 my %ftp_scan_dir;
87 my %ftp_scan_map;
88
89 my $s = new IO::Select;
90 $s->add(\*STDIN);
91 $s->add(\*Server);
92
93 #for ( $waitedpid = 0;
94 #      accept(Client,Server) || $waitedpid;
95 #      $waitedpid = 0, close Client)
96 #{
97 #  next if $waitedpid;
98
99 #$SIG{PIPE} = sub { warn "SIGPIPE received" };
100 #$SIG{CHLD} = sub { warn "SIGCHLD received" };
101
102 #sub REAPER { warn "SIGCHLD received"; my $pid = wait; $SIG{CHLD} = \&REAPER; }
103 #sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; }
104 #sub REAPER { my $pid = wait; delete $kids{$pid}; $SIG{CHLD} = \&REAPER; }
105 #$SIG{CHLD} =  \&REAPER;
106
107 my $undisp = 0;
108 while (1) {
109
110   &reap_kids;
111
112   warn "waiting for connection\n" if $Debug && !$undisp;
113
114   #my @handles = $s->can_read();
115   my @handles = $s->can_read(5);
116   $undisp = !scalar(@handles);
117   foreach my $handle ( @handles ) {
118
119     if ( $handle == \*STDIN ) {
120
121       warn "receiving packet from server\n" if $Debug > 3;
122
123       my $packet = fd_retrieve(\*STDIN);
124       my $token = $packet->{'_token'};
125
126       if ( $token eq '_keepalive' ) {
127         $undisp = 1;
128         next;
129       }
130
131       warn "received packet from server with token $token\n".
132            ( $Debug > 2
133              ? join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
134              : '' )
135         if $Debug;
136
137       if ( $token eq '_ftp_scan' ) {
138         if ( $ftp_scan_dir{$packet->{dir}} ) {
139           warn "already processing ". $packet->{dir}. "\n" if $Debug;
140         } else {
141           $ftp_scan_dir{$packet->{dir}} = 1;
142           spawn \&ftp_scan, $packet;
143         }
144         $undisp = 1;
145         next;
146       }
147
148       if ( exists($kids{$token}) ) {
149         warn "sending return packet to $token via $kids{$token}\n"
150           if $Debug;
151         nstore_fd($packet, $kids{$token});
152         warn "flushing to $token\n" if $Debug;
153         until ( $kids{$token}->flush ) {
154           warn "WARNING: error flushing: $!";
155           sleep 1;
156         }
157         #no close or delete here - will block waiting for child
158         warn "done with $token\n" if $Debug;
159       } else {
160         warn "WARNING: unknown token $token, discarding message";
161       }
162
163     } elsif ( $handle == \*Server ) {
164
165       until ( accept(Client, Server) ) {
166         warn "WARNING: accept failed: $!";
167         next;
168       }
169
170       warn "received local connection; forking\n" if $Debug;
171
172       spawn sub { #child
173         warn "[child-$$] reading packet from local client" if $Debug > 1;
174         my $packet = fd_retrieve(\*Client);
175         warn "[child-$$] packet received:\n".
176              join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
177           if $Debug > 2;
178         my $command = $packet->{'command'};
179         #handle some commands weirdly?
180         $packet->{_token}=$$;
181
182         my $rv = send_and_wait( $packet );
183
184         warn "[child-$$] closing write stream\n" if $Debug > 1;
185         close STDOUT or die "FATAL: can't close write stream: $!"; #??!
186
187         #close STDIN;
188
189         warn "[child-$$] sending response to local client" if $Debug > 1;
190         nstore_fd($rv, \*Client);
191         Client->flush or die "FATAL: can't flush to local client: $!";
192         close Client or die "FATAL: can't close connection to local client: $!";
193
194         warn "[child-$$] child exiting" if $Debug > 1;
195         exit;
196
197       }; #eo child
198
199       #close Client;
200
201     } else {
202       die "wtf?  $handle";
203     }
204
205   }
206   
207 }
208
209 sub reap_kids {
210   #warn "reaping kids\n";
211   foreach my $pid ( keys %kids ) {
212     my $kid = waitpid($pid, WNOHANG);
213     if ( $kid > 0 ) {
214       close $kids{$kid};
215       delete $kids{$kid};
216       if ( $ftp_scan_map{$kid} ) {
217         delete($ftp_scan_dir{$ftp_scan_map{$kid}});
218         delete($ftp_scan_map{$kid});
219       }
220     }
221   }
222   #warn "done reaping\n";
223 }
224
225 sub spawn {
226     my ( $coderef, $packet ) = ( shift, shift );
227
228     unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
229         use Carp;
230         confess "usage: spawn CODEREF";
231     }
232
233     my $pid;
234     #if (!defined($pid = fork)) {
235     my $kid = new IO::Handle;
236     if (!defined($pid = open($kid, '|-'))) {
237         warn "WARNING: cannot fork: $!";
238         return;
239     } elsif ($pid) {
240         warn "begat $pid" if $Debug;
241         $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan;
242         $kids{$pid} = $kid;
243         #$kids{$pid}->autoflush;
244         return; # I'm the parent
245     }
246     # else I'm the child -- go spawn
247
248 #    open(STDIN,  "<&Client")   || die "can't dup client to stdin";
249 #    open(STDOUT, ">&Client")   || die "can't dup client to stdout";
250 #     open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
251     exit &$coderef($packet);
252 }
253
254 sub _logmsg {
255   chomp( my $msg = shift );
256   my $log = new IO::File ">>$log_file";
257   die "can't open $log_file: $!" unless defined($log);
258   flock($log, LOCK_EX);
259   seek($log, 0, 2);
260   print $log "[client] [". scalar(localtime). "] [$$] $msg\n";
261   flock($log, LOCK_UN);
262   close $log;
263 }
264
265 sub send_and_wait {
266   my $packet = shift;
267
268   warn "[child-$$] locking write stream\n" if $Debug > 1;
269   lock_write;
270
271   warn "[child-$$] sending packet to remote server\n" if $Debug > 1;
272   nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
273         
274   warn "[child-$$] flushing write stream\n" if $Debug > 1;
275   STDOUT->flush or die "FATAL: can't flush: $!";
276         
277   warn "[child-$$] releasing write lock\n" if $Debug > 1;
278   unlock_write;
279
280   warn "[child-$$] waiting for response from parent\n" if $Debug > 1;
281   my $w = new IO::Select;
282   $w->add(\*STDIN);
283   until ( $w->can_read ) {
284     warn "[child-$$] WARNING: interrupted select: $!\n";
285   }
286
287   fd_retrieve(\*STDIN);
288 }
289
290 sub lock_write {
291   #broken on freebsd?
292   #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
293
294   #open a new one for each kid to get a unique lock
295   open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
296
297   flock(LOCKFILE, LOCK_EX) or die "FATAL: can't lock $lock_file: $!";
298 }
299
300 sub unlock_write {
301   #broken on freebsd?
302   #flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
303
304   flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!";
305 }
306
307 sub ftp_scan {
308   my $packet = shift;
309
310   warn "[child-$$] performing ftp scan" if $Debug > 1;
311
312   warn "[child-$$] packet received:\n".
313        join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
314     if $Debug > 2;
315
316   $packet->{_token}=$$;
317
318   my $dir;
319   $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves
320   opendir(DIR, $dir) or die "failed to open directory $dir: $!\n";
321   my @files = grep(/\.csv$/, readdir(DIR));
322   closedir(DIR);
323
324   foreach my $file ( @files ) {
325     warn "Processing $file ...\n";
326     my $csv = Text::CSV_XS->new();
327     my $err = "";
328     my @records = ();
329     open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n";
330     open(RESULT, ">$dir/result/$file")
331       or die "can't open result file for $file: $!\n";
332
333     while (<CSV>) {
334       if ( $csv->parse($_) ) {
335         my @columns = $csv->fields();
336         push(@records, \@columns);
337       } else {
338         $err = $csv->error_input;
339         last;
340       }
341     }
342     close(CSV);
343     if ( $err ) {
344       rename("$dir/$file", "$dir/rejected/$file");
345     } else {
346       foreach my $record ( @records ) {
347
348         $packet->{row} = $record;
349         $packet->{_packet} = 'Bulk/processrow';
350         my $result = send_and_wait( $packet );
351
352         if ( $result->{error} ) {
353           my $name;
354           $record->[1] =~ /^(\w+)$/ && ( $name = $1 );
355
356           if ($name) {
357             my $filename = "$dir/rejected/$name";
358             open(REC, ">$filename") or die "can't open $filename: $!\n";
359             print REC join(',', @$record);
360             close REC or die $!;
361             open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n";
362             print ERR $result->{error};
363             close ERR or die $!;
364           }else{
365             warn "bad agent_custid";
366           }
367
368         }
369         print RESULT $result->{message}, "\n";
370       }
371
372       rename("$dir/$file", "$dir/processed/$file");
373       warn "$file processed.\n" if $Debug;
374     }
375     close(RESULT);
376   }
377
378   close STDOUT or die "FATAL: can't close write stream: $!"; #??!
379
380   warn "[child-$$] child exiting" if $Debug > 1;
381   exit;
382
383 }