# This is run REMOTELY over ssh by freeside-selfservice-server
use strict;
-use subs qw(spawn logmsg);
+use subs qw(spawn logmsg lock_write unlock_write);
use Fcntl qw(:flock);
+use POSIX qw(:sys_wait_h);
use Socket;
-use Storable qw(nstore_fd fd_retrieve);
-use IO::Handle;
+use Storable 2.09 qw(nstore_fd fd_retrieve);
+use IO::Handle qw(_IONBF);
use IO::Select;
-use IPC::Open2;
+use IO::File;
+use Text::CSV_XS;
-use LockFile::Simple qw(lock unlock);
+#STDOUT->setbuf('');
+
+my $tag = scalar(@ARGV) ? '.'.shift : '';
use vars qw( $Debug );
-$Debug = 2;
+$Debug = 2; #2 will turn on child logging
+ #3 will log packet contents,#including passwords
+ #4 will log receipts of all packets from server including
+ # keepalives (big!)
-my $socket = "/usr/local/freeside/selfservice_socket";
+my $socket = "/usr/local/freeside/selfservice_socket$tag";
my $pid_file = "$socket.pid";
-my $lock_file = "$socket.lock";
-unlink $lock_file;
-my $me = '[client]';
+my $log_file = "/usr/local/freeside/selfservice$tag.log";
+
+my $lock_file = "/usr/local/freeside/selfservice$tag.writelock";
+
+#my $me = '[client]';
$|=1;
+$SIG{__WARN__} = \&_logmsg;
+#$SIG{__DIE__} = sub { &_logmsg(@_); exit };
+
#read data to be cached or something
#warn "$me Reading init data\n" if $Debug;
#my $signup_init =
-warn "[client] Creating $socket\n" if $Debug;
+warn "Creating $lock_file\n" if $Debug;
+open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
+close LOCKFILE;
+
+warn "Creating $socket\n" if $Debug;
my $uaddr = sockaddr_un($socket);
my $proto = getprotobyname('tcp');
socket(Server,PF_UNIX,SOCK_STREAM,0) or die "socket: $!";
if ( -e $pid_file ) {
open(PIDFILE,"<$pid_file");
- #chomp( my $old_pid = <PIDFILE> );
my $old_pid = <PIDFILE>;
close PIDFILE;
- $old_pid =~ /^(\d+)$/;
- kill 'TERM', $1;
+ if ( $old_pid =~ /^(\d+)$/ ) {
+ kill 'TERM', $1;
+ }
}
open(PIDFILE,">$pid_file");
print PIDFILE "$$\n";
#sub REAPER { $waitedpid = wait; $SIG{CHLD} = \&REAPER; }
#$SIG{CHLD} = \&REAPER;
-warn "[client] entering main loop\n" if $Debug;
+warn "enabling keep alives\n" if $Debug;
+nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT );
-#sub spawn;
-#sub logmsg;
+warn "entering main loop\n" if $Debug;
my %kids;
+my %ftp_scan_dir;
+my %ftp_scan_map;
- # my $gar = <STDIN>;
-
-#my $s = new IO::Select;
-#$s->add(\*STDIN);
-#$s->add(\*Server);
+my $s = new IO::Select;
+$s->add(\*STDIN);
+$s->add(\*Server);
#for ( $waitedpid = 0;
# accept(Client,Server) || $waitedpid;
#$SIG{PIPE} = sub { warn "SIGPIPE received" };
#$SIG{CHLD} = sub { warn "SIGCHLD received" };
-sub REAPER { warn "SIGCHLD received"; my $pid = wait; $SIG{CHLD} = \&REAPER; }
+#sub REAPER { warn "SIGCHLD received"; my $pid = wait; $SIG{CHLD} = \&REAPER; }
+#sub REAPER { my $pid = wait; $SIG{CHLD} = \&REAPER; }
#sub REAPER { my $pid = wait; delete $kids{$pid}; $SIG{CHLD} = \&REAPER; }
-$SIG{CHLD} = \&REAPER;
-
-warn "[client] creating IO::Select\n" if $Debug;
-my $s = new IO::Select;
-$s->add(\*STDIN);
-$s->add(\*Server);
+#$SIG{CHLD} = \&REAPER;
+my $undisp = 0;
while (1) {
-warn "[client] waiting for connection or token\n" if $Debug;
-while ( my @handles = $s->can_read ) {
+ &reap_kids;
+
+ warn "waiting for connection\n" if $Debug && !$undisp;
+ #my @handles = $s->can_read();
+ my @handles = $s->can_read(5);
+ $undisp = !scalar(@handles);
foreach my $handle ( @handles ) {
if ( $handle == \*STDIN ) {
-# my $gar = <STDIN>;
-# die $gar;
+ warn "receiving packet from server\n" if $Debug > 3;
my $packet = fd_retrieve(\*STDIN);
my $token = $packet->{'_token'};
- warn "[client] received packet with token $token\n".
- join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
+
+ if ( $token eq '_keepalive' ) {
+ $undisp = 1;
+ next;
+ }
+
+ warn "received packet from server with token $token\n".
+ ( $Debug > 2
+ ? join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
+ : '' )
if $Debug;
- if ( exists($kids{$token}) ) {
- warn "[client] sending return packet to $token via $kids{$token}\n"
+
+ if ( $token eq '_ftp_scan' ) {
+ if ( $ftp_scan_dir{$packet->{dir}} ) {
+ warn "already processing ". $packet->{dir}. "\n" if $Debug;
+ } else {
+ $ftp_scan_dir{$packet->{dir}} = 1;
+ spawn \&ftp_scan, $packet;
+ }
+ $undisp = 1;
+ next;
+ }
+
+ if ( exists($kids{$token}) ) {
+ warn "sending return packet to $token via $kids{$token}\n"
if $Debug;
nstore_fd($packet, $kids{$token});
- warn "[client] flushing $kids{$token}\n" if $Debug;
- $kids{$token}->flush;
- #eval { $kids{$token}->flush; };
- #die "error flushing?!?!? $@\n" if $@ ne '';
- #warn "[client] closing $kids{$token}\n";
- #close $kids{$token};
- #warn "[client] deleting $kids{$token}\n";
- #delete $kids{$token};
- warn "[client] done with $token\n" if $Debug;
+ warn "flushing to $token\n" if $Debug;
+ until ( $kids{$token}->flush ) {
+ warn "WARNING: error flushing: $!";
+ sleep 1;
+ }
+ #no close or delete here - will block waiting for child
+ warn "done with $token\n" if $Debug;
} else {
- warn "[client] WARNING: unknown token $token, discarding message";
- #die "[client] FATAL: unknown token $token, discarding message";
+ warn "WARNING: unknown token $token, discarding message";
}
} elsif ( $handle == \*Server ) {
- warn "[client] received local connection; forking\n" if $Debug;
+ until ( accept(Client, Server) ) {
+ warn "WARNING: accept failed: $!";
+ next;
+ }
- accept(Client, Server);
+ warn "received local connection; forking\n" if $Debug;
spawn sub { #child
- warn "[client-$$] reading packet from local client" if $Debug > 1;
+ warn "[child-$$] reading packet from local client" if $Debug > 1;
my $packet = fd_retrieve(\*Client);
- warn "[client-$$] packet received:\n".
+ warn "[child-$$] packet received:\n".
join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
- if $Debug > 1;
+ if $Debug > 2;
my $command = $packet->{'command'};
#handle some commands weirdly?
- $packet->{_token}=$$; #??
-
- warn "[client-$$] sending packet to remote server" if $Debug > 1;
- flock(STDOUT, LOCK_EX); #acquire write lock
- #lock($lock_file);
- nstore_fd($packet, \*STDOUT);
- STDOUT->flush;
- #unlock($lock_file);
- flock(STDOUT, LOCK_UN); #release write lock
+ $packet->{_token}=$$;
- warn "[client-$$] waiting for response from parent" if $Debug > 1;
+ my $rv = send_and_wait( $packet );
- #block until parent has a message
- my $w = new IO::Select;
- $w->add(\*STDIN);
- my @wait = $w->can_read;
- my $rv = fd_retrieve(\*STDIN);
+ warn "[child-$$] closing write stream\n" if $Debug > 1;
+ close STDOUT or die "FATAL: can't close write stream: $!"; #??!
#close STDIN;
- warn "[client-$$] sending response to local client" if $Debug > 1;
-
- #send message to local client
+ warn "[child-$$] sending response to local client" if $Debug > 1;
nstore_fd($rv, \*Client);
- Client->flush;
-
- close Client;
+ Client->flush or die "FATAL: can't flush to local client: $!";
+ close Client or die "FATAL: can't close connection to local client: $!";
- warn "[client-$$] child exiting" if $Debug > 1;
-
- #while (1) { sleep 5 };
- #sleep 5;
+ warn "[child-$$] child exiting" if $Debug > 1;
exit;
}; #eo child
- #close Client; #in parent, right?
+ #close Client;
} else {
die "wtf? $handle";
}
}
-
- warn "[client] done handling messages; returning to wait-state" if $Debug;;
-
+
}
-#die "[client] died unexpectedly: $!\n";
-warn "[client] fell-through unexpectedly: $!\n" if $Debug;
-
-} #WTF?
+sub reap_kids {
+ #warn "reaping kids\n";
+ foreach my $pid ( keys %kids ) {
+ my $kid = waitpid($pid, WNOHANG);
+ if ( $kid > 0 ) {
+ close $kids{$kid};
+ delete $kids{$kid};
+ if ( $ftp_scan_map{$kid} ) {
+ delete($ftp_scan_dir{$ftp_scan_map{$kid}});
+ delete($ftp_scan_map{$kid});
+ }
+ }
+ }
+ #warn "done reaping\n";
+}
sub spawn {
- my $coderef = shift;
+ my ( $coderef, $packet ) = ( shift, shift );
unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
use Carp;
#if (!defined($pid = fork)) {
my $kid = new IO::Handle;
if (!defined($pid = open($kid, '|-'))) {
- logmsg "WARNING: cannot fork: $!";
+ warn "WARNING: cannot fork: $!";
return;
} elsif ($pid) {
- logmsg "begat $pid" if $Debug;
+ warn "begat $pid" if $Debug;
+ $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan;
$kids{$pid} = $kid;
#$kids{$pid}->autoflush;
return; # I'm the parent
# open(STDIN, "<&Client") || die "can't dup client to stdin";
# open(STDOUT, ">&Client") || die "can't dup client to stdout";
- ## open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
- exit &$coderef();
+# open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
+ exit &$coderef($packet);
+}
+
+sub _logmsg {
+ chomp( my $msg = shift );
+ my $log = new IO::File ">>$log_file";
+ die "can't open $log_file: $!" unless defined($log);
+ flock($log, LOCK_EX);
+ seek($log, 0, 2);
+ print $log "[client] [". scalar(localtime). "] [$$] $msg\n";
+ flock($log, LOCK_UN);
+ close $log;
+}
+
+sub send_and_wait {
+ my $packet = shift;
+
+ warn "[child-$$] locking write stream\n" if $Debug > 1;
+ lock_write;
+
+ warn "[child-$$] sending packet to remote server\n" if $Debug > 1;
+ nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
+
+ warn "[child-$$] flushing write stream\n" if $Debug > 1;
+ STDOUT->flush or die "FATAL: can't flush: $!";
+
+ warn "[child-$$] releasing write lock\n" if $Debug > 1;
+ unlock_write;
+
+ warn "[child-$$] waiting for response from parent\n" if $Debug > 1;
+ my $w = new IO::Select;
+ $w->add(\*STDIN);
+ until ( $w->can_read ) {
+ warn "[child-$$] WARNING: interrupted select: $!\n";
+ }
+
+ fd_retrieve(\*STDIN);
}
-#sub logmsg { print "$0 $$: @_ at ", scalar localtime, "\n" }
-#DON'T PRINT!!!!!
-sub logmsg { warn "[client] $0 $$: @_ at ", scalar localtime, "\n" }
+sub lock_write {
+ #broken on freebsd?
+ #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
+
+ #open a new one for each kid to get a unique lock
+ open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
+
+ flock(LOCKFILE, LOCK_EX) or die "FATAL: can't lock $lock_file: $!";
+}
+
+sub unlock_write {
+ #broken on freebsd?
+ #flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
+
+ flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!";
+}
+
+sub ftp_scan {
+ my $packet = shift;
+
+ warn "[child-$$] performing ftp scan" if $Debug > 1;
+ warn "[child-$$] packet received:\n".
+ join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
+ if $Debug > 2;
+
+ $packet->{_token}=$$;
+
+ my $dir;
+ $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves
+ opendir(DIR, $dir) or die "failed to open directory $dir: $!\n";
+ my @files = grep(/\.csv$/, readdir(DIR));
+ closedir(DIR);
+
+ foreach my $file ( @files ) {
+ warn "Processing $file ...\n";
+ my $csv = Text::CSV_XS->new();
+ my $err = "";
+ my @records = ();
+ open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n";
+ open(RESULT, ">$dir/result/$file")
+ or die "can't open result file for $file: $!\n";
+
+ while (<CSV>) {
+ if ( $csv->parse($_) ) {
+ my @columns = $csv->fields();
+ push(@records, \@columns);
+ } else {
+ $err = $csv->error_input;
+ last;
+ }
+ }
+ close(CSV);
+ if ( $err ) {
+ rename("$dir/$file", "$dir/rejected/$file");
+ } else {
+ foreach my $record ( @records ) {
+
+ $packet->{row} = $record;
+ $packet->{_packet} = 'Bulk/processrow';
+ my $result = send_and_wait( $packet );
+
+ if ( $result->{error} ) {
+ my $name;
+ $record->[1] =~ /^(\w+)$/ && ( $name = $1 );
+
+ if ($name) {
+ my $filename = "$dir/rejected/$name";
+ open(REC, ">$filename") or die "can't open $filename: $!\n";
+ print REC join(',', @$record);
+ close REC or die $!;
+ open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n";
+ print ERR $result->{error};
+ close ERR or die $!;
+ }else{
+ warn "bad agent_custid";
+ }
+
+ }
+ print RESULT $result->{message}, "\n";
+ }
+
+ rename("$dir/$file", "$dir/processed/$file");
+ warn "$file processed.\n" if $Debug;
+ }
+ close(RESULT);
+ }
+
+ close STDOUT or die "FATAL: can't close write stream: $!"; #??!
+
+ warn "[child-$$] child exiting" if $Debug > 1;
+ exit;
+
+}