3 # freeside-selfservice-clientd
5 # This is run REMOTELY over ssh by freeside-selfservice-server
8 use subs qw(spawn logmsg lock_write unlock_write);
10 use POSIX qw(:sys_wait_h);
12 use Storable 2.09 qw(nstore_fd fd_retrieve);
13 use IO::Handle qw(_IONBF);
20 my $tag = scalar(@ARGV) ? '.'.shift : '';
22 use vars qw( $Debug );
23 $Debug = 2; #2 will turn on child logging
24 #3 will log packet contents,#including passwords
25 #4 will log receipts of all packets from server including
28 my $socket = "/usr/local/freeside/selfservice_socket$tag";
29 my $pid_file = "$socket.pid";
31 my $log_file = "/usr/local/freeside/selfservice$tag.log";
33 my $lock_file = "/usr/local/freeside/selfservice$tag.writelock";
39 $SIG{__WARN__} = \&_logmsg;
40 #$SIG{__DIE__} = sub { &_logmsg(@_); exit };
42 #read data to be cached or something
43 #warn "$me Reading init data\n" if $Debug;
46 warn "Creating $lock_file\n" if $Debug;
47 open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
50 warn "Creating $socket\n" if $Debug;
51 my $uaddr = sockaddr_un($socket);
52 my $proto = getprotobyname('tcp');
53 socket(Server,PF_UNIX,SOCK_STREAM,0) or die "socket: $!";
55 bind(Server, $uaddr) or die "bind: $!";
56 listen(Server,SOMAXCONN) or die "listen: $!";
59 open(PIDFILE,"<$pid_file");
60 my $old_pid = <PIDFILE>;
62 if ( $old_pid =~ /^(\d+)$/ ) {
66 open(PIDFILE,">$pid_file");
71 #sub REAPER { $waitedpid = wait; $SIG{CHLD} = \&REAPER; }
72 #$SIG{CHLD} = \&REAPER;
74 warn "enabling keep alives\n" if $Debug;
75 nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT );
77 warn "entering main loop\n" if $Debug;
83 my $s = new IO::Select;
87 #for ( $waitedpid = 0;
88 # accept(Client,Server) || $waitedpid;
89 # $waitedpid = 0, close Client)
93 #$SIG{PIPE} = sub { warn "SIGPIPE received" };
94 #$SIG{CHLD} = sub { warn "SIGCHLD received" };
96 #sub REAPER { warn "SIGCHLD received"; my $pid = wait; $SIG{CHLD} = \&REAPER; }
97 #sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; }
98 #sub REAPER { my $pid = wait; delete $kids{$pid}; $SIG{CHLD} = \&REAPER; }
99 #$SIG{CHLD} = \&REAPER;
106 warn "waiting for connection\n" if $Debug && !$undisp;
108 #my @handles = $s->can_read();
109 my @handles = $s->can_read(5);
110 $undisp = !scalar(@handles);
111 foreach my $handle ( @handles ) {
113 if ( $handle == \*STDIN ) {
115 warn "receiving packet from server\n" if $Debug > 3;
117 my $packet = fd_retrieve(\*STDIN);
118 my $token = $packet->{'_token'};
120 if ( $token eq '_keepalive' ) {
125 warn "received packet from server with token $token\n".
127 ? join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
131 if ( $token eq '_ftp_scan' ) {
132 if ( $ftp_scan_dir{$packet->{dir}} ) {
133 warn "already processing ". $packet->{dir}. "\n" if $Debug;
135 $ftp_scan_dir{$packet->{dir}} = 1;
136 spawn \&ftp_scan, $packet;
142 if ( exists($kids{$token}) ) {
143 warn "sending return packet to $token via $kids{$token}\n"
145 nstore_fd($packet, $kids{$token});
146 warn "flushing to $token\n" if $Debug;
147 until ( $kids{$token}->flush ) {
148 warn "WARNING: error flushing: $!";
151 #no close or delete here - will block waiting for child
152 warn "done with $token\n" if $Debug;
154 warn "WARNING: unknown token $token, discarding message";
157 } elsif ( $handle == \*Server ) {
159 until ( accept(Client, Server) ) {
160 warn "WARNING: accept failed: $!";
164 warn "received local connection; forking\n" if $Debug;
167 warn "[child-$$] reading packet from local client" if $Debug > 1;
168 my $packet = fd_retrieve(\*Client);
169 warn "[child-$$] packet received:\n".
170 join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
172 my $command = $packet->{'command'};
173 #handle some commands weirdly?
174 $packet->{_token}=$$;
176 my $rv = send_and_wait( $packet );
178 warn "[child-$$] closing write stream\n" if $Debug > 1;
179 close STDOUT or die "FATAL: can't close write stream: $!"; #??!
183 warn "[child-$$] sending response to local client" if $Debug > 1;
184 nstore_fd($rv, \*Client);
185 Client->flush or die "FATAL: can't flush to local client: $!";
186 close Client or die "FATAL: can't close connection to local client: $!";
188 warn "[child-$$] child exiting" if $Debug > 1;
204 #warn "reaping kids\n";
205 foreach my $pid ( keys %kids ) {
206 my $kid = waitpid($pid, WNOHANG);
210 if ( $ftp_scan_map{$kid} ) {
211 delete($ftp_scan_dir{$ftp_scan_map{$kid}});
212 delete($ftp_scan_map{$kid});
216 #warn "done reaping\n";
220 my ( $coderef, $packet ) = ( shift, shift );
222 unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
224 confess "usage: spawn CODEREF";
228 #if (!defined($pid = fork)) {
229 my $kid = new IO::Handle;
230 if (!defined($pid = open($kid, '|-'))) {
231 warn "WARNING: cannot fork: $!";
234 warn "begat $pid" if $Debug;
235 $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan;
237 #$kids{$pid}->autoflush;
238 return; # I'm the parent
240 # else I'm the child -- go spawn
242 # open(STDIN, "<&Client") || die "can't dup client to stdin";
243 # open(STDOUT, ">&Client") || die "can't dup client to stdout";
244 # open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
245 exit &$coderef($packet);
249 chomp( my $msg = shift );
250 my $log = new IO::File ">>$log_file";
251 die "can't open $log_file: $!" unless defined($log);
252 flock($log, LOCK_EX);
254 print $log "[client] [". scalar(localtime). "] [$$] $msg\n";
255 flock($log, LOCK_UN);
262 warn "[child-$$] locking write stream\n" if $Debug > 1;
265 warn "[child-$$] sending packet to remote server\n" if $Debug > 1;
266 nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
268 warn "[child-$$] flushing write stream\n" if $Debug > 1;
269 STDOUT->flush or die "FATAL: can't flush: $!";
271 warn "[child-$$] releasing write lock\n" if $Debug > 1;
274 warn "[child-$$] waiting for response from parent\n" if $Debug > 1;
275 my $w = new IO::Select;
277 until ( $w->can_read ) {
278 warn "[child-$$] WARNING: interrupted select: $!\n";
281 fd_retrieve(\*STDIN);
286 #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
288 #open a new one for each kid to get a unique lock
289 open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
291 flock(LOCKFILE, LOCK_EX) or die "FATAL: can't lock $lock_file: $!";
296 #flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
298 flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!";
304 warn "[child-$$] performing ftp scan" if $Debug > 1;
306 warn "[child-$$] packet received:\n".
307 join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
310 $packet->{_token}=$$;
313 $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves
314 opendir(DIR, $dir) or die "failed to open directory $dir: $!\n";
315 my @files = grep(/\.csv$/, readdir(DIR));
318 foreach my $file ( @files ) {
319 warn "Processing $file ...\n";
320 my $csv = Text::CSV_XS->new();
323 open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n";
324 open(RESULT, ">$dir/result/$file")
325 or die "can't open result file for $file: $!\n";
328 if ( $csv->parse($_) ) {
329 my @columns = $csv->fields();
330 push(@records, \@columns);
332 $err = $csv->error_input;
338 rename("$dir/$file", "$dir/rejected/$file");
340 foreach my $record ( @records ) {
342 $packet->{row} = $record;
343 $packet->{_packet} = 'Bulk/processrow';
344 my $result = send_and_wait( $packet );
346 if ( $result->{error} ) {
348 $record->[1] =~ /^(\w+)$/ && ( $name = $1 );
351 my $filename = "$dir/rejected/$name";
352 open(REC, ">$filename") or die "can't open $filename: $!\n";
353 print REC join(',', @$record);
355 open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n";
356 print ERR $result->{error};
359 warn "bad agent_custid";
363 print RESULT $result->{message}, "\n";
366 rename("$dir/$file", "$dir/processed/$file");
367 warn "$file processed.\n" if $Debug;
372 close STDOUT or die "FATAL: can't close write stream: $!"; #??!
374 warn "[child-$$] child exiting" if $Debug > 1;