improve wholesale SS UI, RT10883
[freeside.git] / fs_selfservice / FS-SelfService / freeside-selfservice-clientd
index 0c25c34..0819d9d 100644 (file)
@@ -5,36 +5,48 @@
 # This is run REMOTELY over ssh by freeside-selfservice-server
 
 use strict;
-use subs qw(spawn logmsg);
+use subs qw(spawn logmsg lock_write unlock_write);
 use Fcntl qw(:flock);
 use POSIX qw(:sys_wait_h);
 use Socket;
-use Storable qw(nstore_fd fd_retrieve);
+use Storable 2.09 qw(nstore_fd fd_retrieve);
 use IO::Handle qw(_IONBF);
 use IO::Select;
 use IO::File;
+use Text::CSV_XS;
 
-STDOUT->setbuf('');
+#STDOUT->setbuf('');
+
+my $tag = scalar(@ARGV) ? '.'.shift : '';
 
 use vars qw( $Debug );
-$Debug = 3; #2 will turn on child logging, 3 will log packet contents,
-            #including potentially compromising information
+$Debug = 2; #2 will turn on child logging
+            #3 will log packet contents,#including passwords
+            #4 will log receipts of all packets from server including
+            #  keepalives (big!)
 
-my $socket = "/usr/local/freeside/selfservice_socket";
+my $socket = "/usr/local/freeside/selfservice_socket$tag";
 my $pid_file = "$socket.pid";
 
-my $log_file = "/usr/local/freeside/selfservice.log";
+my $log_file = "/usr/local/freeside/selfservice$tag.log";
+
+my $lock_file = "/usr/local/freeside/selfservice$tag.writelock";
 
 #my $me = '[client]';
 
 $|=1;
 
 $SIG{__WARN__} = \&_logmsg;
+#$SIG{__DIE__} = sub { &_logmsg(@_); exit };
 
 #read data to be cached or something
 #warn "$me Reading init data\n" if $Debug;
 #my $signup_init = 
 
+warn "Creating $lock_file\n" if $Debug;
+open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
+close LOCKFILE;
+
 warn "Creating $socket\n" if $Debug;
 my $uaddr = sockaddr_un($socket);
 my $proto = getprotobyname('tcp');
@@ -47,8 +59,9 @@ if ( -e $pid_file ) {
   open(PIDFILE,"<$pid_file");
   my $old_pid = <PIDFILE>;
   close PIDFILE;
-  $old_pid =~ /^(\d+)$/;
-  kill 'TERM', $1;
+  if ( $old_pid =~ /^(\d+)$/ ) {
+    kill 'TERM', $1;
+  }
 }
 open(PIDFILE,">$pid_file");
 print PIDFILE "$$\n";
@@ -58,9 +71,14 @@ close PIDFILE;
 #sub REAPER { $waitedpid = wait; $SIG{CHLD} = \&REAPER; }
 #$SIG{CHLD} =  \&REAPER;
 
+warn "enabling keep alives\n" if $Debug;
+nstore_fd( { _packet => '_enable_keepalive' } , \*STDOUT );
+
 warn "entering main loop\n" if $Debug;
 
 my %kids;
+my %ftp_scan_dir;
+my %ftp_scan_map;
 
 my $s = new IO::Select;
 $s->add(\*STDIN);
@@ -94,17 +112,34 @@ while (1) {
 
     if ( $handle == \*STDIN ) {
 
-      warn "receiving packet from server\n" if $Debug;
+      warn "receiving packet from server\n" if $Debug > 3;
 
       my $packet = fd_retrieve(\*STDIN);
       my $token = $packet->{'_token'};
+
+      if ( $token eq '_keepalive' ) {
+        $undisp = 1;
+        next;
+      }
+
       warn "received packet from server with token $token\n".
            ( $Debug > 2
              ? join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
              : '' )
         if $Debug;
 
-     if ( exists($kids{$token}) ) {
+      if ( $token eq '_ftp_scan' ) {
+        if ( $ftp_scan_dir{$packet->{dir}} ) {
+          warn "already processing ". $packet->{dir}. "\n" if $Debug;
+        } else {
+          $ftp_scan_dir{$packet->{dir}} = 1;
+          spawn \&ftp_scan, $packet;
+        }
+        $undisp = 1;
+        next;
+      }
+
+      if ( exists($kids{$token}) ) {
         warn "sending return packet to $token via $kids{$token}\n"
           if $Debug;
         nstore_fd($packet, $kids{$token});
@@ -138,20 +173,10 @@ while (1) {
         #handle some commands weirdly?
         $packet->{_token}=$$;
 
-        warn "[child-$$] sending packet to remote server" if $Debug > 1;
-        flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
-        nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
-        STDOUT->flush or die "FATAL: can't flush: $!";
-        flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
-        close STDOUT or die "FATAL: can't close write stream: $!"; #??!
+        my $rv = send_and_wait( $packet );
 
-        warn "[child-$$] waiting for response from parent" if $Debug > 1;
-        my $w = new IO::Select;
-        $w->add(\*STDIN);
-        until ( $w->can_read ) {
-          warn "[child-$$] WARNING: interrupted select: $!\n";
-        }
-        my $rv = fd_retrieve(\*STDIN);
+        warn "[child-$$] closing write stream\n" if $Debug > 1;
+        close STDOUT or die "FATAL: can't close write stream: $!"; #??!
 
         #close STDIN;
 
@@ -182,13 +207,17 @@ sub reap_kids {
     if ( $kid > 0 ) {
       close $kids{$kid};
       delete $kids{$kid};
+      if ( $ftp_scan_map{$kid} ) {
+        delete($ftp_scan_dir{$ftp_scan_map{$kid}});
+        delete($ftp_scan_map{$kid});
+      }
     }
   }
   #warn "done reaping\n";
 }
 
 sub spawn {
-    my $coderef = shift;
+    my ( $coderef, $packet ) = ( shift, shift );
 
     unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
         use Carp;
@@ -203,6 +232,7 @@ sub spawn {
         return;
     } elsif ($pid) {
         warn "begat $pid" if $Debug;
+        $ftp_scan_map{$pid} = $packet->{dir} if $coderef == \&ftp_scan;
         $kids{$pid} = $kid;
         #$kids{$pid}->autoflush;
         return; # I'm the parent
@@ -212,15 +242,136 @@ sub spawn {
 #    open(STDIN,  "<&Client")   || die "can't dup client to stdin";
 #    open(STDOUT, ">&Client")   || die "can't dup client to stdout";
 #     open(STDERR, ">&STDOUT") || die "can't dup stdout to stderr";
-    exit &$coderef();
+    exit &$coderef($packet);
 }
 
 sub _logmsg {
   chomp( my $msg = shift );
   my $log = new IO::File ">>$log_file";
+  die "can't open $log_file: $!" unless defined($log);
   flock($log, LOCK_EX);
   seek($log, 0, 2);
   print $log "[client] [". scalar(localtime). "] [$$] $msg\n";
   flock($log, LOCK_UN);
   close $log;
 }
+
+sub send_and_wait {
+  my $packet = shift;
+
+  warn "[child-$$] locking write stream\n" if $Debug > 1;
+  lock_write;
+
+  warn "[child-$$] sending packet to remote server\n" if $Debug > 1;
+  nstore_fd($packet, \*STDOUT) or die "FATAL: can't send response: $!";
+        
+  warn "[child-$$] flushing write stream\n" if $Debug > 1;
+  STDOUT->flush or die "FATAL: can't flush: $!";
+        
+  warn "[child-$$] releasing write lock\n" if $Debug > 1;
+  unlock_write;
+
+  warn "[child-$$] waiting for response from parent\n" if $Debug > 1;
+  my $w = new IO::Select;
+  $w->add(\*STDIN);
+  until ( $w->can_read ) {
+    warn "[child-$$] WARNING: interrupted select: $!\n";
+  }
+
+  fd_retrieve(\*STDIN);
+}
+
+sub lock_write {
+  #broken on freebsd?
+  #flock(STDOUT, LOCK_EX) or die "FATAL: can't lock write stream: $!";
+
+  #open a new one for each kid to get a unique lock
+  open(LOCKFILE,">$lock_file") or die "can't open $lock_file: $!";
+
+  flock(LOCKFILE, LOCK_EX) or die "FATAL: can't lock $lock_file: $!";
+}
+
+sub unlock_write {
+  #broken on freebsd?
+  #flock(STDOUT, LOCK_UN) or die "FATAL: can't release write lock: $!";
+
+  flock(LOCKFILE, LOCK_UN) or die "FATAL: can't unlock $lock_file: $!";
+}
+
+sub ftp_scan {
+  my $packet = shift;
+
+  warn "[child-$$] performing ftp scan" if $Debug > 1;
+
+  warn "[child-$$] packet received:\n".
+       join('', map { " $_=>$packet->{$_}\n" } keys %$packet )
+    if $Debug > 2;
+
+  $packet->{_token}=$$;
+
+  my $dir;
+  $packet->{dir} =~ /^(.*)$/ && ($dir = $1); # we trust ourselves
+  opendir(DIR, $dir) or die "failed to open directory $dir: $!\n";
+  my @files = grep(/\.csv$/, readdir(DIR));
+  closedir(DIR);
+
+  foreach my $file ( @files ) {
+    warn "Processing $file ...\n";
+    my $csv = Text::CSV_XS->new();
+    my $err = "";
+    my @records = ();
+    open(CSV, "<$dir/$file") or die "can't open input file for $file: $!\n";
+    open(RESULT, ">$dir/result/$file")
+      or die "can't open result file for $file: $!\n";
+
+    while (<CSV>) {
+      if ( $csv->parse($_) ) {
+        my @columns = $csv->fields();
+        push(@records, \@columns);
+      } else {
+        $err = $csv->error_input;
+        last;
+      }
+    }
+    close(CSV);
+    if ( $err ) {
+      rename("$dir/$file", "$dir/rejected/$file");
+    } else {
+      foreach my $record ( @records ) {
+
+        $packet->{row} = $record;
+        $packet->{_packet} = 'Bulk/processrow';
+        my $result = send_and_wait( $packet );
+
+        if ( $result->{error} ) {
+          my $name;
+          $record->[1] =~ /^(\w+)$/ && ( $name = $1 );
+
+          if ($name) {
+            my $filename = "$dir/rejected/$name";
+            open(REC, ">$filename") or die "can't open $filename: $!\n";
+            print REC join(',', @$record);
+            close REC or die $!;
+            open(ERR, ">$filename.err") or die "can't open $filename.err: $!\n";
+            print ERR $result->{error};
+            close ERR or die $!;
+          }else{
+            warn "bad agent_custid";
+          }
+
+        }
+        print RESULT $result->{message}, "\n";
+      }
+
+      rename("$dir/$file", "$dir/processed/$file");
+      warn "$file processed.\n" if $Debug;
+    }
+    close(RESULT);
+  }
+
+  close STDOUT or die "FATAL: can't close write stream: $!"; #??!
+
+  warn "[child-$$] child exiting" if $Debug > 1;
+  exit;
+
+}