add CDR batch TFTP feature, RT#3113
[freeside.git] / FS / FS / cdr.pm
1 package FS::cdr;
2
3 use strict;
4 use vars qw( @ISA @EXPORT_OK $DEBUG );
5 use Exporter;
6 use Tie::IxHash;
7 use Date::Parse;
8 use Date::Format;
9 use Time::Local;
10 use FS::UID qw( dbh );
11 use FS::Record qw( qsearch qsearchs );
12 use FS::cdr_type;
13 use FS::cdr_calltype;
14 use FS::cdr_carrier;
15 use FS::cdr_upstream_rate;
16
17 @ISA = qw(FS::Record);
18 @EXPORT_OK = qw( _cdr_date_parser_maker _cdr_min_parser_maker );
19
20 $DEBUG = 0;
21
22 =head1 NAME
23
24 FS::cdr - Object methods for cdr records
25
26 =head1 SYNOPSIS
27
28   use FS::cdr;
29
30   $record = new FS::cdr \%hash;
31   $record = new FS::cdr { 'column' => 'value' };
32
33   $error = $record->insert;
34
35   $error = $new_record->replace($old_record);
36
37   $error = $record->delete;
38
39   $error = $record->check;
40
41 =head1 DESCRIPTION
42
43 An FS::cdr object represents an Call Data Record, typically from a telephony
44 system or provider of some sort.  FS::cdr inherits from FS::Record.  The
45 following fields are currently supported:
46
47 =over 4
48
49 =item acctid - primary key
50
51 =item calldate - Call timestamp (SQL timestamp)
52
53 =item clid - Caller*ID with text
54
55 =item src - Caller*ID number / Source number
56
57 =item dst - Destination extension
58
59 =item dcontext - Destination context
60
61 =item channel - Channel used
62
63 =item dstchannel - Destination channel if appropriate
64
65 =item lastapp - Last application if appropriate
66
67 =item lastdata - Last application data
68
69 =item startdate - Start of call (UNIX-style integer timestamp)
70
71 =item answerdate - Answer time of call (UNIX-style integer timestamp)
72
73 =item enddate - End time of call (UNIX-style integer timestamp)
74
75 =item duration - Total time in system, in seconds
76
77 =item billsec - Total time call is up, in seconds
78
79 =item disposition - What happened to the call: ANSWERED, NO ANSWER, BUSY 
80
81 =item amaflags - What flags to use: BILL, IGNORE etc, specified on a per channel basis like accountcode. 
82
83 =cut
84
85   #ignore the "omit" and "documentation" AMAs??
86   #AMA = Automated Message Accounting. 
87   #default: Sets the system default. 
88   #omit: Do not record calls. 
89   #billing: Mark the entry for billing 
90   #documentation: Mark the entry for documentation.
91
92 =item accountcode - CDR account number to use: account
93
94 =item uniqueid - Unique channel identifier (Unitel/RSLCOM Event ID)
95
96 =item userfield - CDR user-defined field
97
98 =item cdr_type - CDR type - see L<FS::cdr_type> (Usage = 1, S&E = 7, OC&C = 8)
99
100 =item charged_party - Service number to be billed
101
102 =item upstream_currency - Wholesale currency from upstream
103
104 =item upstream_price - Wholesale price from upstream
105
106 =item upstream_rateplanid - Upstream rate plan ID
107
108 =item rated_price - Rated (or re-rated) price
109
110 =item distance - km (need units field?)
111
112 =item islocal - Local - 1, Non Local = 0
113
114 =item calltypenum - Type of call - see L<FS::cdr_calltype>
115
116 =item description - Description (cdr_type 7&8 only) (used for cust_bill_pkg.itemdesc)
117
118 =item quantity - Number of items (cdr_type 7&8 only)
119
120 =item carrierid - Upstream Carrier ID (see L<FS::cdr_carrier>) 
121
122 =cut
123
124 #Telstra =1, Optus = 2, RSL COM = 3
125
126 =item upstream_rateid - Upstream Rate ID
127
128 =item svcnum - Link to customer service (see L<FS::cust_svc>)
129
130 =item freesidestatus - NULL, done (or something)
131
132 =item cdrbatch
133
134 =back
135
136 =head1 METHODS
137
138 =over 4
139
140 =item new HASHREF
141
142 Creates a new CDR.  To add the CDR to the database, see L<"insert">.
143
144 Note that this stores the hash reference, not a distinct copy of the hash it
145 points to.  You can ask the object for a copy with the I<hash> method.
146
147 =cut
148
149 # the new method can be inherited from FS::Record, if a table method is defined
150
151 sub table { 'cdr'; }
152
153 =item insert
154
155 Adds this record to the database.  If there is an error, returns the error,
156 otherwise returns false.
157
158 =cut
159
160 # the insert method can be inherited from FS::Record
161
162 =item delete
163
164 Delete this record from the database.
165
166 =cut
167
168 # the delete method can be inherited from FS::Record
169
170 =item replace OLD_RECORD
171
172 Replaces the OLD_RECORD with this one in the database.  If there is an error,
173 returns the error, otherwise returns false.
174
175 =cut
176
177 # the replace method can be inherited from FS::Record
178
179 =item check
180
181 Checks all fields to make sure this is a valid CDR.  If there is
182 an error, returns the error, otherwise returns false.  Called by the insert
183 and replace methods.
184
185 Note: Unlike most types of records, we don't want to "reject" a CDR and we want
186 to process them as quickly as possible, so we allow the database to check most
187 of the data.
188
189 =cut
190
191 sub check {
192   my $self = shift;
193
194 # we don't want to "reject" a CDR like other sorts of input...
195 #  my $error = 
196 #    $self->ut_numbern('acctid')
197 ##    || $self->ut_('calldate')
198 #    || $self->ut_text('clid')
199 #    || $self->ut_text('src')
200 #    || $self->ut_text('dst')
201 #    || $self->ut_text('dcontext')
202 #    || $self->ut_text('channel')
203 #    || $self->ut_text('dstchannel')
204 #    || $self->ut_text('lastapp')
205 #    || $self->ut_text('lastdata')
206 #    || $self->ut_numbern('startdate')
207 #    || $self->ut_numbern('answerdate')
208 #    || $self->ut_numbern('enddate')
209 #    || $self->ut_number('duration')
210 #    || $self->ut_number('billsec')
211 #    || $self->ut_text('disposition')
212 #    || $self->ut_number('amaflags')
213 #    || $self->ut_text('accountcode')
214 #    || $self->ut_text('uniqueid')
215 #    || $self->ut_text('userfield')
216 #    || $self->ut_numbern('cdrtypenum')
217 #    || $self->ut_textn('charged_party')
218 ##    || $self->ut_n('upstream_currency')
219 ##    || $self->ut_n('upstream_price')
220 #    || $self->ut_numbern('upstream_rateplanid')
221 ##    || $self->ut_n('distance')
222 #    || $self->ut_numbern('islocal')
223 #    || $self->ut_numbern('calltypenum')
224 #    || $self->ut_textn('description')
225 #    || $self->ut_numbern('quantity')
226 #    || $self->ut_numbern('carrierid')
227 #    || $self->ut_numbern('upstream_rateid')
228 #    || $self->ut_numbern('svcnum')
229 #    || $self->ut_textn('freesidestatus')
230 #  ;
231 #  return $error if $error;
232
233   $self->calldate( $self->startdate_sql )
234     if !$self->calldate && $self->startdate;
235
236   unless ( $self->charged_party ) {
237     if ( $self->dst =~ /^(\+?1)?8[02-8]{2}/ ) {
238       $self->charged_party($self->dst);
239     } else {
240       $self->charged_party($self->src);
241     }
242   }
243
244   #check the foreign keys even?
245   #do we want to outright *reject* the CDR?
246   my $error =
247        $self->ut_numbern('acctid')
248
249     #Usage = 1, S&E = 7, OC&C = 8
250     || $self->ut_foreign_keyn('cdrtypenum',  'cdr_type',     'cdrtypenum' )
251
252     #the big list in appendix 2
253     || $self->ut_foreign_keyn('calltypenum', 'cdr_calltype', 'calltypenum' )
254
255     # Telstra =1, Optus = 2, RSL COM = 3
256     || $self->ut_foreign_keyn('carrierid', 'cdr_carrier', 'carrierid' )
257   ;
258   return $error if $error;
259
260   $self->SUPER::check;
261 }
262
263 =item set_status_and_rated_price STATUS [ RATED_PRICE ]
264
265 Sets the status to the provided string.  If there is an error, returns the
266 error, otherwise returns false.
267
268 =cut
269
270 sub set_status_and_rated_price {
271   my($self, $status, $rated_price) = @_;
272   $self->freesidestatus($status);
273   $self->rated_price($rated_price);
274   $self->replace();
275 }
276
277 =item calldate_unix 
278
279 Parses the calldate in SQL string format and returns a UNIX timestamp.
280
281 =cut
282
283 sub calldate_unix {
284   str2time(shift->calldate);
285 }
286
287 =item startdate_sql
288
289 Parses the startdate in UNIX timestamp format and returns a string in SQL
290 format.
291
292 =cut
293
294 sub startdate_sql {
295   my($sec,$min,$hour,$mday,$mon,$year) = localtime(shift->startdate);
296   $mon++;
297   $year += 1900;
298   "$year-$mon-$mday $hour:$min:$sec";
299 }
300
301 =item cdr_carrier
302
303 Returns the FS::cdr_carrier object associated with this CDR, or false if no
304 carrierid is defined.
305
306 =cut
307
308 my %carrier_cache = ();
309
310 sub cdr_carrier {
311   my $self = shift;
312   return '' unless $self->carrierid;
313   $carrier_cache{$self->carrierid} ||=
314     qsearchs('cdr_carrier', { 'carrierid' => $self->carrierid } );
315 }
316
317 =item carriername 
318
319 Returns the carrier name (see L<FS::cdr_carrier>), or the empty string if
320 no FS::cdr_carrier object is assocated with this CDR.
321
322 =cut
323
324 sub carriername {
325   my $self = shift;
326   my $cdr_carrier = $self->cdr_carrier;
327   $cdr_carrier ? $cdr_carrier->carriername : '';
328 }
329
330 =item cdr_calltype
331
332 Returns the FS::cdr_calltype object associated with this CDR, or false if no
333 calltypenum is defined.
334
335 =cut
336
337 my %calltype_cache = ();
338
339 sub cdr_calltype {
340   my $self = shift;
341   return '' unless $self->calltypenum;
342   $calltype_cache{$self->calltypenum} ||=
343     qsearchs('cdr_calltype', { 'calltypenum' => $self->calltypenum } );
344 }
345
346 =item calltypename 
347
348 Returns the call type name (see L<FS::cdr_calltype>), or the empty string if
349 no FS::cdr_calltype object is assocated with this CDR.
350
351 =cut
352
353 sub calltypename {
354   my $self = shift;
355   my $cdr_calltype = $self->cdr_calltype;
356   $cdr_calltype ? $cdr_calltype->calltypename : '';
357 }
358
359 =item cdr_upstream_rate
360
361 Returns the upstream rate mapping (see L<FS::cdr_upstream_rate>), or the empty
362 string if no FS::cdr_upstream_rate object is associated with this CDR.
363
364 =cut
365
366 sub cdr_upstream_rate {
367   my $self = shift;
368   return '' unless $self->upstream_rateid;
369   qsearchs('cdr_upstream_rate', { 'upstream_rateid' => $self->upstream_rateid })
370     or '';
371 }
372
373 =item _convergent_format COLUMN [ COUNTRYCODE ]
374
375 Returns the number in COLUMN formatted as follows:
376
377 If the country code does not match COUNTRYCODE (default "61"), it is returned
378 unchanged.
379
380 If the country code does match COUNTRYCODE (default "61"), it is removed.  In
381 addiiton, "0" is prepended unless the number starts with 13, 18 or 19. (???)
382
383 =cut
384
385 sub _convergent_format {
386   my( $self, $field ) = ( shift, shift );
387   my $countrycode = scalar(@_) ? shift : '61'; #+61 = australia
388   #my $number = $self->$field();
389   my $number = $self->get($field);
390   #if ( $number =~ s/^(\+|011)$countrycode// ) {
391   if ( $number =~ s/^\+$countrycode// ) {
392     $number = "0$number"
393       unless $number =~ /^1[389]/; #???
394   }
395   $number;
396 }
397
398 =item downstream_csv [ OPTION => VALUE, ... ]
399
400 =cut
401
402 my %export_names = (
403   'convergent'      => {},
404   'simple'  => { 'name'           => 'Simple',
405                  'invoice_header' =>
406                      "Date,Time,Name,Destination,Duration,Price",
407                },
408   'simple2' => { 'name'           => 'Simple with source',
409                  'invoice_header' =>
410                      #"Date,Time,Name,Called From,Destination,Duration,Price",
411                      "Date,Time,Called From,Destination,Duration,Price",
412                },
413 );
414
415 my %export_formats = (
416   'convergent' => [
417     'carriername', #CARRIER
418     sub { shift->_convergent_format('src') }, #SERVICE_NUMBER
419     sub { shift->_convergent_format('charged_party') }, #CHARGED_NUMBER
420     sub { time2str('%Y-%m-%d', shift->calldate_unix ) }, #DATE
421     sub { time2str('%T',       shift->calldate_unix ) }, #TIME
422     'billsec', #'duration', #DURATION
423     sub { shift->_convergent_format('dst') }, #NUMBER_DIALED
424     '', #XXX add (from prefixes in most recent email) #FROM_DESC
425     '', #XXX add (from prefixes in most recent email) #TO_DESC
426     'calltypename', #CLASS_CODE
427     'rated_price', #PRICE
428     sub { shift->rated_price ? 'Y' : 'N' }, #RATED
429     '', #OTHER_INFO
430   ],
431   'simple' => [
432     sub { time2str('%D', shift->calldate_unix ) },   #DATE
433     sub { time2str('%r', shift->calldate_unix ) },   #TIME
434     'userfield',                                     #USER
435     'dst',                                           #NUMBER_DIALED
436     sub { sprintf('%.2fm', shift->billsec / 60 ) },  #DURATION
437     sub { sprintf('%.3f', shift->upstream_price ) }, #PRICE
438   ],
439   'simple2' => [
440     sub { time2str('%D', shift->calldate_unix ) },   #DATE
441     sub { time2str('%r', shift->calldate_unix ) },   #TIME
442     #'userfield',                                     #USER
443     'dst',                                           #NUMBER_DIALED
444     'src',                                           #called from
445     sub { sprintf('%.2fm', shift->billsec / 60 ) },  #DURATION
446     sub { sprintf('%.3f', shift->upstream_price ) }, #PRICE
447   ],
448 );
449
450 sub downstream_csv {
451   my( $self, %opt ) = @_;
452
453   my $format = $opt{'format'}; # 'convergent';
454   return "Unknown format $format" unless exists $export_formats{$format};
455
456   eval "use Text::CSV_XS;";
457   die $@ if $@;
458   my $csv = new Text::CSV_XS;
459
460   my @columns =
461     map {
462           ref($_) ? &{$_}($self) : $self->$_();
463         }
464     @{ $export_formats{$format} };
465
466   my $status = $csv->combine(@columns);
467   die "FS::CDR: error combining ". $csv->error_input(). "into downstream CSV"
468     unless $status;
469
470   $csv->string;
471
472 }
473
474 =back
475
476 =head1 CLASS METHODS
477
478 =over 4
479
480 =item invoice_formats
481
482 Returns an ordered list of key value pairs containing invoice format names
483 as keys (for use with part_pkg::voip_cdr) and "pretty" format names as values.
484
485 =cut
486
487 sub invoice_formats {
488   map { ($_ => $export_names{$_}->{'name'}) }
489     grep { $export_names{$_}->{'invoice_header'} }
490     keys %export_names;
491 }
492
493 =item invoice_header FORMAT
494
495 Returns a scalar containing the CSV column header for invoice format FORMAT.
496
497 =cut
498
499 sub invoice_header {
500   my $format = shift;
501   $export_names{$format}->{'invoice_header'};
502 }
503
504 =item import_formats
505
506 Returns an ordered list of key value pairs containing import format names
507 as keys (for use with batch_import) and "pretty" format names as values.
508
509 =cut
510
511 #false laziness w/part_pkg & part_export
512
513 my %cdr_info;
514 foreach my $INC ( @INC ) {
515   warn "globbing $INC/FS/cdr/*.pm\n" if $DEBUG;
516   foreach my $file ( glob("$INC/FS/cdr/*.pm") ) {
517     warn "attempting to load CDR format info from $file\n" if $DEBUG;
518     $file =~ /\/(\w+)\.pm$/ or do {
519       warn "unrecognized file in $INC/FS/cdr/: $file\n";
520       next;
521     };
522     my $mod = $1;
523     my $info = eval "use FS::cdr::$mod; ".
524                     "\\%FS::cdr::$mod\::info;";
525     if ( $@ ) {
526       die "error using FS::cdr::$mod (skipping): $@\n" if $@;
527       next;
528     }
529     unless ( keys %$info ) {
530       warn "no %info hash found in FS::cdr::$mod, skipping\n";
531       next;
532     }
533     warn "got CDR format info from FS::cdr::$mod: $info\n" if $DEBUG;
534     if ( exists($info->{'disabled'}) && $info->{'disabled'} ) {
535       warn "skipping disabled CDR format FS::cdr::$mod" if $DEBUG;
536       next;
537     }
538     $cdr_info{$mod} = $info;
539   }
540 }
541
542 tie my %import_formats, 'Tie::IxHash',
543   map  { $_ => $cdr_info{$_}->{'name'} }
544   sort { $cdr_info{$a}->{'weight'} <=> $cdr_info{$b}->{'weight'} }
545   grep { exists($cdr_info{$_}->{'import_fields'}) }
546   keys %cdr_info;
547
548 sub import_formats {
549   %import_formats;
550 }
551
552 sub _cdr_min_parser_maker {
553   my $field = shift;
554   my @fields = ref($field) ? @$field : ($field);
555   @fields = qw( billsec duration ) unless scalar(@fields);
556   return sub {
557     my( $cdr, $min ) = @_;
558     my $sec = eval { _cdr_min_parse($min) };
559     die "error parsing seconds for @fields from $min minutes: $@\n" if $@;
560     $cdr->$_($sec) foreach @fields;
561   };
562 }
563
564 sub _cdr_min_parse {
565   my $min = shift;
566   sprintf('%.0f', $min * 60 );
567 }
568
569 sub _cdr_date_parser_maker {
570   my $field = shift;
571   return sub {
572     my( $cdr, $date ) = @_;
573     #$cdr->$field( _cdr_date_parse($date) );
574     eval { $cdr->$field( _cdr_date_parse($date) ); };
575     die "error parsing date for $field from $date: $@\n" if $@;
576   };
577 }
578
579 sub _cdr_date_parse {
580   my $date = shift;
581
582   return '' unless length($date); #that's okay, it becomes NULL
583
584   my($year, $mon, $day, $hour, $min, $sec);
585
586   #$date =~ /^\s*(\d{4})[\-\/]\(\d{1,2})[\-\/](\d{1,2})\s+(\d{1,2}):(\d{1,2}):(\d{1,2})\s*$/
587   #taqua  #2007-10-31 08:57:24.113000000
588
589   if ( $date =~ /^\s*(\d{4})\D(\d{1,2})\D(\d{1,2})\s+(\d{1,2})\D(\d{1,2})\D(\d{1,2})(\D|$)/ ) {
590     ($year, $mon, $day, $hour, $min, $sec) = ( $1, $2, $3, $4, $5, $6 );
591   } elsif ( $date  =~ /^\s*(\d{1,2})\D(\d{1,2})\D(\d{4})\s+(\d{1,2})\D(\d{1,2})\D(\d{1,2})(\D|$)/ ) {
592     ($mon, $day, $year, $hour, $min, $sec) = ( $1, $2, $3, $4, $5, $6 );
593   } else {
594      die "unparsable date: $date"; #maybe we shouldn't die...
595   }
596
597   return '' if $year == 1900 && $mon == 1 && $day == 1
598             && $hour == 0    && $min == 0 && $sec == 0;
599
600   timelocal($sec, $min, $hour, $day, $mon-1, $year);
601 }
602
603 =item batch_import HASHREF
604
605 Imports CDR records.  Available options are:
606
607 =over 4
608
609 =item filehandle
610
611 =item format
612
613 =back
614
615 =cut
616
617 sub batch_import {
618   my $param = shift;
619
620   my $fh = $param->{filehandle};
621   my $format = $param->{format};
622   my $cdrbatch = $param->{cdrbatch};
623
624   return "Unknown format $format"
625     unless exists( $cdr_info{$format} )
626         && exists( $cdr_info{$format}->{'import_fields'} );
627
628   my $info = $cdr_info{$format};
629
630   my $type = exists($info->{'type'}) ? lc($info->{'type'}) : 'csv';
631
632   my $parser;
633   if ( $type eq 'csv' ) {
634     eval "use Text::CSV_XS;";
635     die $@ if $@;
636     $parser = new Text::CSV_XS;
637   } elsif ( $type eq 'fixedlength' ) {
638     eval "use Parse::FixedLength;";
639     die $@ if $@;
640     $parser = new Parse::FixedLength $info->{'fixedlength_format'};
641   } else {
642     die "Unknown CDR format type $type for format $format\n";
643   }
644
645   my $imported = 0;
646   #my $columns;
647
648   local $SIG{HUP} = 'IGNORE';
649   local $SIG{INT} = 'IGNORE';
650   local $SIG{QUIT} = 'IGNORE';
651   local $SIG{TERM} = 'IGNORE';
652   local $SIG{TSTP} = 'IGNORE';
653   local $SIG{PIPE} = 'IGNORE';
654
655   my $oldAutoCommit = $FS::UID::AutoCommit;
656   local $FS::UID::AutoCommit = 0;
657   my $dbh = dbh;
658
659   my $header_lines = exists($info->{'header'}) ? $info->{'header'} : 0;
660
661   my $line;
662   while ( defined($line=<$fh>) ) {
663
664     next if $header_lines-- > 0; #&& $line =~ /^[\w, "]+$/ 
665
666     my @columns = ();
667     if ( $type eq 'csv' ) {
668
669       $parser->parse($line) or do {
670         $dbh->rollback if $oldAutoCommit;
671         return "can't parse: ". $parser->error_input();
672       };
673
674       @columns = $parser->fields();
675
676     } elsif ( $type eq 'fixedlength' ) {
677
678       @columns = $parser->parse($line);
679
680     } else {
681       die "Unknown CDR format type $type for format $format\n";
682     }
683
684     #warn join('-',@columns);
685
686     if ( $format eq 'simple' ) { #should be a callback or opt in FS::cdr::simple
687       @columns = map { s/^ +//; $_; } @columns;
688     }
689
690     my @later = ();
691     my %cdr =
692       map {
693
694         my $field_or_sub = $_;
695         if ( ref($field_or_sub) ) {
696           push @later, $field_or_sub, shift(@columns);
697           ();
698         } else {
699           ( $field_or_sub => shift @columns );
700         }
701
702       }
703       @{ $info->{'import_fields'} }
704     ;
705  
706     $cdr{cdrbatch} = $cdrbatch;
707
708     my $cdr = new FS::cdr ( \%cdr );
709
710     while ( scalar(@later) ) {
711       my $sub = shift @later;
712       my $data = shift @later;
713       &{$sub}($cdr, $data);  # $cdr->&{$sub}($data); 
714     }
715
716     if ( $format eq 'taqua' ) { #should be a callback or opt in FS::cdr::taqua
717       if ( $cdr->enddate && $cdr->startdate  ) { #a bit more?
718         $cdr->duration( $cdr->enddate - $cdr->startdate  );
719       }
720       if ( $cdr->enddate && $cdr->answerdate ) { #a bit more?
721         $cdr->billsec(  $cdr->enddate - $cdr->answerdate );
722       } 
723     }
724
725     my $error = $cdr->insert;
726     if ( $error ) {
727       $dbh->rollback if $oldAutoCommit;
728       return $error;
729
730       #or just skip?
731       #next;
732     }
733
734     $imported++;
735   }
736
737   $dbh->commit or die $dbh->errstr if $oldAutoCommit;
738
739   #might want to disable this if we skip records for any reason...
740   return "Empty file!" unless $imported || $param->{empty_ok};
741
742   '';
743
744 }
745
746 =back
747
748 =head1 BUGS
749
750 =head1 SEE ALSO
751
752 L<FS::Record>, schema.html from the base documentation.
753
754 =cut
755
756 1;
757