X-Git-Url: http://git.freeside.biz/gitweb/?p=freeside.git;a=blobdiff_plain;f=fs_selfservice%2FFS-SelfService%2Ffreeside-selfservice-clientd;fp=fs_selfservice%2FFS-SelfService%2Ffreeside-selfservice-clientd;h=0819d9d67210305fc5c74f7862bba29fdf0f215d;hp=bdc8e15475ddf2a6286e8506e74571f51b539671;hb=15e57a4859d967a13113602b112c4aa197ca6002;hpb=12ec6818dfd015042f516821c35ac68846b6baa0 diff --git a/fs_selfservice/FS-SelfService/freeside-selfservice-clientd b/fs_selfservice/FS-SelfService/freeside-selfservice-clientd index bdc8e1547..0819d9d67 100644 --- a/fs_selfservice/FS-SelfService/freeside-selfservice-clientd +++ b/fs_selfservice/FS-SelfService/freeside-selfservice-clientd @@ -13,6 +13,7 @@ use Storable 2.09 qw(nstore_fd fd_retrieve); use IO::Handle qw(_IONBF); use IO::Select; use IO::File; +use Text::CSV_XS; #STDOUT->setbuf(''); @@ -36,6 +37,7 @@ my $lock_file = "/usr/local/freeside/selfservice$tag.writelock"; $|=1; $SIG{__WARN__} = \&_logmsg; +#$SIG{__DIE__} = sub { &_logmsg(@_); exit }; #read data to be cached or something #warn "$me Reading init data\n" if $Debug; @@ -75,6 +77,8 @@ nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT ); warn "entering main loop\n" if $Debug; my %kids; +my %ftp_scan_dir; +my %ftp_scan_map; my $s = new IO::Select; $s->add(\*STDIN); @@ -124,7 +128,18 @@ while (1) { : '' ) if $Debug; - if ( exists($kids{$token}) ) { + if ( $token eq '_ftp_scan' ) { + if ( $ftp_scan_dir{$packet->{dir}} ) { + warn "already processing ". $packet->{dir}. "\n" if $Debug; + } else { + $ftp_scan_dir{$packet->{dir}} = 1; + spawn \&ftp_scan, $packet; + } + $undisp = 1; + next; + } + + if ( exists($kids{$token}) ) { warn "sending return packet to $token via $kids{$token}\n" if $Debug; nstore_fd($packet, $kids{$token}); @@ -158,29 +173,11 @@ while (1) { #handle some commands weirdly? $packet->{_token}=$$; - warn "[child-$$] locking write stream\n" if $Debug > 1; - lock_write; - - warn "[child-$$] sending packet to remote server\n" if $Debug > 1; - nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!"; - - warn "[child-$$] flushing write stream\n" if $Debug > 1; - STDOUT->flush or die "FATAL: can't flush: $!"; - - warn "[child-$$] releasing write lock\n" if $Debug > 1; - unlock_write; + my $rv = send_and_wait( $packet ); warn "[child-$$] closing write stream\n" if $Debug > 1; close STDOUT or die "FATAL: can't close write stream: $!"; #??! - warn "[child-$$] waiting for response from parent\n" if $Debug > 1; - my $w = new IO::Select; - $w->add(\*STDIN); - until ( $w->can_read ) { - warn "[child-$$] WARNING: interrupted select: $!\n"; - } - my $rv = fd_retrieve(\*STDIN); - #close STDIN; warn "[child-$$] sending response to local client" if $Debug > 1; @@ -210,13 +207,17 @@ sub reap_kids { if ( $kid > 0 ) { close $kids{$kid}; delete $kids{$kid}; + if ( $ftp_scan_map{$kid} ) { + delete($ftp_scan_dir{$ftp_scan_map{$kid}}); + delete($ftp_scan_map{$kid}); + } } } #warn "done reaping\n"; } sub spawn { - my $coderef = shift; + my ( $coderef, $packet ) = ( shift, shift ); unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') { use Carp; @@ -231,6 +232,7 @@ sub spawn { return; } elsif ($pid) { warn "begat $pid" if $Debug; + $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan; $kids{$pid} = $kid; #$kids{$pid}->autoflush; return; # I'm the parent @@ -240,7 +242,7 @@ sub spawn { # open(STDIN, "<&Client") || die "can't dup client to stdin"; # open(STDOUT, ">&Client") || die "can't dup client to stdout"; # open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr"; - exit &$coderef(); + exit &$coderef($packet); } sub _logmsg { @@ -254,6 +256,31 @@ sub _logmsg { close $log; } +sub send_and_wait { + my $packet = shift; + + warn "[child-$$] locking write stream\n" if $Debug > 1; + lock_write; + + warn "[child-$$] sending packet to remote server\n" if $Debug > 1; + nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!"; + + warn "[child-$$] flushing write stream\n" if $Debug > 1; + STDOUT->flush or die "FATAL: can't flush: $!"; + + warn "[child-$$] releasing write lock\n" if $Debug > 1; + unlock_write; + + warn "[child-$$] waiting for response from parent\n" if $Debug > 1; + my $w = new IO::Select; + $w->add(\*STDIN); + until ( $w->can_read ) { + warn "[child-$$] WARNING: interrupted select: $!\n"; + } + + fd_retrieve(\*STDIN); +} + sub lock_write { #broken on freebsd? #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!"; @@ -270,3 +297,81 @@ sub unlock_write { flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!"; } + +sub ftp_scan { + my $packet = shift; + + warn "[child-$$] performing ftp scan" if $Debug > 1; + + warn "[child-$$] packet received:\n". + join('', map { " $_=>$packet->{$_}\n" } keys %$packet ) + if $Debug > 2; + + $packet->{_token}=$$; + + my $dir; + $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves + opendir(DIR, $dir) or die "failed to open directory $dir: $!\n"; + my @files = grep(/\.csv$/, readdir(DIR)); + closedir(DIR); + + foreach my $file ( @files ) { + warn "Processing $file ...\n"; + my $csv = Text::CSV_XS->new(); + my $err = ""; + my @records = (); + open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n"; + open(RESULT, ">$dir/result/$file") + or die "can't open result file for $file: $!\n"; + + while () { + if ( $csv->parse($_) ) { + my @columns = $csv->fields(); + push(@records, \@columns); + } else { + $err = $csv->error_input; + last; + } + } + close(CSV); + if ( $err ) { + rename("$dir/$file", "$dir/rejected/$file"); + } else { + foreach my $record ( @records ) { + + $packet->{row} = $record; + $packet->{_packet} = 'Bulk/processrow'; + my $result = send_and_wait( $packet ); + + if ( $result->{error} ) { + my $name; + $record->[1] =~ /^(\w+)$/ && ( $name = $1 ); + + if ($name) { + my $filename = "$dir/rejected/$name"; + open(REC, ">$filename") or die "can't open $filename: $!\n"; + print REC join(',', @$record); + close REC or die $!; + open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n"; + print ERR $result->{error}; + close ERR or die $!; + }else{ + warn "bad agent_custid"; + } + + } + print RESULT $result->{message}, "\n"; + } + + rename("$dir/$file", "$dir/processed/$file"); + warn "$file processed.\n" if $Debug; + } + close(RESULT); + } + + close STDOUT or die "FATAL: can't close write stream: $!"; #??! + + warn "[child-$$] child exiting" if $Debug > 1; + exit; + +}