additional QIS/Taqua-specific CDR handling details, RT#3838
[freeside.git] / FS / FS / cdr.pm
1 package FS::cdr;
2
3 use strict;
4 use vars qw( @ISA @EXPORT_OK $DEBUG );
5 use Exporter;
6 use Tie::IxHash;
7 use Date::Parse;
8 use Date::Format;
9 use Time::Local;
10 use FS::UID qw( dbh );
11 use FS::Record qw( qsearch qsearchs );
12 use FS::cdr_type;
13 use FS::cdr_calltype;
14 use FS::cdr_carrier;
15 use FS::cdr_upstream_rate;
16
17 @ISA = qw(FS::Record);
18 @EXPORT_OK = qw( _cdr_date_parser_maker _cdr_min_parser_maker );
19
20 $DEBUG = 0;
21
22 =head1 NAME
23
24 FS::cdr - Object methods for cdr records
25
26 =head1 SYNOPSIS
27
28   use FS::cdr;
29
30   $record = new FS::cdr \%hash;
31   $record = new FS::cdr { 'column' => 'value' };
32
33   $error = $record->insert;
34
35   $error = $new_record->replace($old_record);
36
37   $error = $record->delete;
38
39   $error = $record->check;
40
41 =head1 DESCRIPTION
42
43 An FS::cdr object represents an Call Data Record, typically from a telephony
44 system or provider of some sort.  FS::cdr inherits from FS::Record.  The
45 following fields are currently supported:
46
47 =over 4
48
49 =item acctid - primary key
50
51 =item calldate - Call timestamp (SQL timestamp)
52
53 =item clid - Caller*ID with text
54
55 =item src - Caller*ID number / Source number
56
57 =item dst - Destination extension
58
59 =item dcontext - Destination context
60
61 =item channel - Channel used
62
63 =item dstchannel - Destination channel if appropriate
64
65 =item lastapp - Last application if appropriate
66
67 =item lastdata - Last application data
68
69 =item startdate - Start of call (UNIX-style integer timestamp)
70
71 =item answerdate - Answer time of call (UNIX-style integer timestamp)
72
73 =item enddate - End time of call (UNIX-style integer timestamp)
74
75 =item duration - Total time in system, in seconds
76
77 =item billsec - Total time call is up, in seconds
78
79 =item disposition - What happened to the call: ANSWERED, NO ANSWER, BUSY 
80
81 =item amaflags - What flags to use: BILL, IGNORE etc, specified on a per channel basis like accountcode. 
82
83 =cut
84
85   #ignore the "omit" and "documentation" AMAs??
86   #AMA = Automated Message Accounting. 
87   #default: Sets the system default. 
88   #omit: Do not record calls. 
89   #billing: Mark the entry for billing 
90   #documentation: Mark the entry for documentation.
91
92 =item accountcode - CDR account number to use: account
93
94 =item uniqueid - Unique channel identifier (Unitel/RSLCOM Event ID)
95
96 =item userfield - CDR user-defined field
97
98 =item cdr_type - CDR type - see L<FS::cdr_type> (Usage = 1, S&E = 7, OC&C = 8)
99
100 =item charged_party - Service number to be billed
101
102 =item upstream_currency - Wholesale currency from upstream
103
104 =item upstream_price - Wholesale price from upstream
105
106 =item upstream_rateplanid - Upstream rate plan ID
107
108 =item rated_price - Rated (or re-rated) price
109
110 =item distance - km (need units field?)
111
112 =item islocal - Local - 1, Non Local = 0
113
114 =item calltypenum - Type of call - see L<FS::cdr_calltype>
115
116 =item description - Description (cdr_type 7&8 only) (used for cust_bill_pkg.itemdesc)
117
118 =item quantity - Number of items (cdr_type 7&8 only)
119
120 =item carrierid - Upstream Carrier ID (see L<FS::cdr_carrier>) 
121
122 =cut
123
124 #Telstra =1, Optus = 2, RSL COM = 3
125
126 =item upstream_rateid - Upstream Rate ID
127
128 =item svcnum - Link to customer service (see L<FS::cust_svc>)
129
130 =item freesidestatus - NULL, done (or something)
131
132 =item cdrbatch
133
134 =back
135
136 =head1 METHODS
137
138 =over 4
139
140 =item new HASHREF
141
142 Creates a new CDR.  To add the CDR to the database, see L<"insert">.
143
144 Note that this stores the hash reference, not a distinct copy of the hash it
145 points to.  You can ask the object for a copy with the I<hash> method.
146
147 =cut
148
149 # the new method can be inherited from FS::Record, if a table method is defined
150
151 sub table { 'cdr'; }
152
153 =item insert
154
155 Adds this record to the database.  If there is an error, returns the error,
156 otherwise returns false.
157
158 =cut
159
160 # the insert method can be inherited from FS::Record
161
162 =item delete
163
164 Delete this record from the database.
165
166 =cut
167
168 # the delete method can be inherited from FS::Record
169
170 =item replace OLD_RECORD
171
172 Replaces the OLD_RECORD with this one in the database.  If there is an error,
173 returns the error, otherwise returns false.
174
175 =cut
176
177 # the replace method can be inherited from FS::Record
178
179 =item check
180
181 Checks all fields to make sure this is a valid CDR.  If there is
182 an error, returns the error, otherwise returns false.  Called by the insert
183 and replace methods.
184
185 Note: Unlike most types of records, we don't want to "reject" a CDR and we want
186 to process them as quickly as possible, so we allow the database to check most
187 of the data.
188
189 =cut
190
191 sub check {
192   my $self = shift;
193
194 # we don't want to "reject" a CDR like other sorts of input...
195 #  my $error = 
196 #    $self->ut_numbern('acctid')
197 ##    || $self->ut_('calldate')
198 #    || $self->ut_text('clid')
199 #    || $self->ut_text('src')
200 #    || $self->ut_text('dst')
201 #    || $self->ut_text('dcontext')
202 #    || $self->ut_text('channel')
203 #    || $self->ut_text('dstchannel')
204 #    || $self->ut_text('lastapp')
205 #    || $self->ut_text('lastdata')
206 #    || $self->ut_numbern('startdate')
207 #    || $self->ut_numbern('answerdate')
208 #    || $self->ut_numbern('enddate')
209 #    || $self->ut_number('duration')
210 #    || $self->ut_number('billsec')
211 #    || $self->ut_text('disposition')
212 #    || $self->ut_number('amaflags')
213 #    || $self->ut_text('accountcode')
214 #    || $self->ut_text('uniqueid')
215 #    || $self->ut_text('userfield')
216 #    || $self->ut_numbern('cdrtypenum')
217 #    || $self->ut_textn('charged_party')
218 ##    || $self->ut_n('upstream_currency')
219 ##    || $self->ut_n('upstream_price')
220 #    || $self->ut_numbern('upstream_rateplanid')
221 ##    || $self->ut_n('distance')
222 #    || $self->ut_numbern('islocal')
223 #    || $self->ut_numbern('calltypenum')
224 #    || $self->ut_textn('description')
225 #    || $self->ut_numbern('quantity')
226 #    || $self->ut_numbern('carrierid')
227 #    || $self->ut_numbern('upstream_rateid')
228 #    || $self->ut_numbern('svcnum')
229 #    || $self->ut_textn('freesidestatus')
230 #  ;
231 #  return $error if $error;
232
233   $self->calldate( $self->startdate_sql )
234     if !$self->calldate && $self->startdate;
235
236   unless ( $self->charged_party ) {
237     if ( $self->dst =~ /^(\+?1)?8[02-8]{2}/ ) {
238       $self->charged_party($self->dst);
239     } else {
240       $self->charged_party($self->src);
241     }
242   }
243
244   #check the foreign keys even?
245   #do we want to outright *reject* the CDR?
246   my $error =
247        $self->ut_numbern('acctid')
248
249   #add a config option to turn these back on if someone needs 'em
250   #
251   #  #Usage = 1, S&E = 7, OC&C = 8
252   #  || $self->ut_foreign_keyn('cdrtypenum',  'cdr_type',     'cdrtypenum' )
253   #
254   #  #the big list in appendix 2
255   #  || $self->ut_foreign_keyn('calltypenum', 'cdr_calltype', 'calltypenum' )
256   #
257   #  # Telstra =1, Optus = 2, RSL COM = 3
258   #  || $self->ut_foreign_keyn('carrierid', 'cdr_carrier', 'carrierid' )
259   ;
260   return $error if $error;
261
262   $self->SUPER::check;
263 }
264
265 =item set_status_and_rated_price STATUS [ RATED_PRICE ]
266
267 Sets the status to the provided string.  If there is an error, returns the
268 error, otherwise returns false.
269
270 =cut
271
272 sub set_status_and_rated_price {
273   my($self, $status, $rated_price) = @_;
274   $self->freesidestatus($status);
275   $self->rated_price($rated_price);
276   $self->replace();
277 }
278
279 =item calldate_unix 
280
281 Parses the calldate in SQL string format and returns a UNIX timestamp.
282
283 =cut
284
285 sub calldate_unix {
286   str2time(shift->calldate);
287 }
288
289 =item startdate_sql
290
291 Parses the startdate in UNIX timestamp format and returns a string in SQL
292 format.
293
294 =cut
295
296 sub startdate_sql {
297   my($sec,$min,$hour,$mday,$mon,$year) = localtime(shift->startdate);
298   $mon++;
299   $year += 1900;
300   "$year-$mon-$mday $hour:$min:$sec";
301 }
302
303 =item cdr_carrier
304
305 Returns the FS::cdr_carrier object associated with this CDR, or false if no
306 carrierid is defined.
307
308 =cut
309
310 my %carrier_cache = ();
311
312 sub cdr_carrier {
313   my $self = shift;
314   return '' unless $self->carrierid;
315   $carrier_cache{$self->carrierid} ||=
316     qsearchs('cdr_carrier', { 'carrierid' => $self->carrierid } );
317 }
318
319 =item carriername 
320
321 Returns the carrier name (see L<FS::cdr_carrier>), or the empty string if
322 no FS::cdr_carrier object is assocated with this CDR.
323
324 =cut
325
326 sub carriername {
327   my $self = shift;
328   my $cdr_carrier = $self->cdr_carrier;
329   $cdr_carrier ? $cdr_carrier->carriername : '';
330 }
331
332 =item cdr_calltype
333
334 Returns the FS::cdr_calltype object associated with this CDR, or false if no
335 calltypenum is defined.
336
337 =cut
338
339 my %calltype_cache = ();
340
341 sub cdr_calltype {
342   my $self = shift;
343   return '' unless $self->calltypenum;
344   $calltype_cache{$self->calltypenum} ||=
345     qsearchs('cdr_calltype', { 'calltypenum' => $self->calltypenum } );
346 }
347
348 =item calltypename 
349
350 Returns the call type name (see L<FS::cdr_calltype>), or the empty string if
351 no FS::cdr_calltype object is assocated with this CDR.
352
353 =cut
354
355 sub calltypename {
356   my $self = shift;
357   my $cdr_calltype = $self->cdr_calltype;
358   $cdr_calltype ? $cdr_calltype->calltypename : '';
359 }
360
361 =item cdr_upstream_rate
362
363 Returns the upstream rate mapping (see L<FS::cdr_upstream_rate>), or the empty
364 string if no FS::cdr_upstream_rate object is associated with this CDR.
365
366 =cut
367
368 sub cdr_upstream_rate {
369   my $self = shift;
370   return '' unless $self->upstream_rateid;
371   qsearchs('cdr_upstream_rate', { 'upstream_rateid' => $self->upstream_rateid })
372     or '';
373 }
374
375 =item _convergent_format COLUMN [ COUNTRYCODE ]
376
377 Returns the number in COLUMN formatted as follows:
378
379 If the country code does not match COUNTRYCODE (default "61"), it is returned
380 unchanged.
381
382 If the country code does match COUNTRYCODE (default "61"), it is removed.  In
383 addiiton, "0" is prepended unless the number starts with 13, 18 or 19. (???)
384
385 =cut
386
387 sub _convergent_format {
388   my( $self, $field ) = ( shift, shift );
389   my $countrycode = scalar(@_) ? shift : '61'; #+61 = australia
390   #my $number = $self->$field();
391   my $number = $self->get($field);
392   #if ( $number =~ s/^(\+|011)$countrycode// ) {
393   if ( $number =~ s/^\+$countrycode// ) {
394     $number = "0$number"
395       unless $number =~ /^1[389]/; #???
396   }
397   $number;
398 }
399
400 =item downstream_csv [ OPTION => VALUE, ... ]
401
402 =cut
403
404 my %export_names = (
405   'convergent'      => {},
406   'simple'  => { 'name'           => 'Simple',
407                  'invoice_header' =>
408                      "Date,Time,Name,Destination,Duration,Price",
409                },
410   'simple2' => { 'name'           => 'Simple with source',
411                  'invoice_header' =>
412                      #"Date,Time,Name,Called From,Destination,Duration,Price",
413                      "Date,Time,Called From,Destination,Duration,Price",
414                },
415 );
416
417 my %export_formats = (
418   'convergent' => [
419     'carriername', #CARRIER
420     sub { shift->_convergent_format('src') }, #SERVICE_NUMBER
421     sub { shift->_convergent_format('charged_party') }, #CHARGED_NUMBER
422     sub { time2str('%Y-%m-%d', shift->calldate_unix ) }, #DATE
423     sub { time2str('%T',       shift->calldate_unix ) }, #TIME
424     'billsec', #'duration', #DURATION
425     sub { shift->_convergent_format('dst') }, #NUMBER_DIALED
426     '', #XXX add (from prefixes in most recent email) #FROM_DESC
427     '', #XXX add (from prefixes in most recent email) #TO_DESC
428     'calltypename', #CLASS_CODE
429     'rated_price', #PRICE
430     sub { shift->rated_price ? 'Y' : 'N' }, #RATED
431     '', #OTHER_INFO
432   ],
433   'simple' => [
434     sub { time2str('%D', shift->calldate_unix ) },   #DATE
435     sub { time2str('%r', shift->calldate_unix ) },   #TIME
436     'userfield',                                     #USER
437     'dst',                                           #NUMBER_DIALED
438     sub { sprintf('%.2fm', shift->billsec / 60 ) },  #DURATION
439     sub { sprintf('%.3f', shift->upstream_price ) }, #PRICE
440   ],
441   'simple2' => [
442     sub { time2str('%D', shift->calldate_unix ) },   #DATE
443     sub { time2str('%r', shift->calldate_unix ) },   #TIME
444     #'userfield',                                     #USER
445     'dst',                                           #NUMBER_DIALED
446     'src',                                           #called from
447     sub { sprintf('%.2fm', shift->billsec / 60 ) },  #DURATION
448     sub { sprintf('%.3f', shift->upstream_price ) }, #PRICE
449   ],
450 );
451
452 sub downstream_csv {
453   my( $self, %opt ) = @_;
454
455   my $format = $opt{'format'}; # 'convergent';
456   return "Unknown format $format" unless exists $export_formats{$format};
457
458   eval "use Text::CSV_XS;";
459   die $@ if $@;
460   my $csv = new Text::CSV_XS;
461
462   my @columns =
463     map {
464           ref($_) ? &{$_}($self) : $self->$_();
465         }
466     @{ $export_formats{$format} };
467
468   my $status = $csv->combine(@columns);
469   die "FS::CDR: error combining ". $csv->error_input(). "into downstream CSV"
470     unless $status;
471
472   $csv->string;
473
474 }
475
476 =back
477
478 =head1 CLASS METHODS
479
480 =over 4
481
482 =item invoice_formats
483
484 Returns an ordered list of key value pairs containing invoice format names
485 as keys (for use with part_pkg::voip_cdr) and "pretty" format names as values.
486
487 =cut
488
489 sub invoice_formats {
490   map { ($_ => $export_names{$_}->{'name'}) }
491     grep { $export_names{$_}->{'invoice_header'} }
492     keys %export_names;
493 }
494
495 =item invoice_header FORMAT
496
497 Returns a scalar containing the CSV column header for invoice format FORMAT.
498
499 =cut
500
501 sub invoice_header {
502   my $format = shift;
503   $export_names{$format}->{'invoice_header'};
504 }
505
506 =item import_formats
507
508 Returns an ordered list of key value pairs containing import format names
509 as keys (for use with batch_import) and "pretty" format names as values.
510
511 =cut
512
513 #false laziness w/part_pkg & part_export
514
515 my %cdr_info;
516 foreach my $INC ( @INC ) {
517   warn "globbing $INC/FS/cdr/*.pm\n" if $DEBUG;
518   foreach my $file ( glob("$INC/FS/cdr/*.pm") ) {
519     warn "attempting to load CDR format info from $file\n" if $DEBUG;
520     $file =~ /\/(\w+)\.pm$/ or do {
521       warn "unrecognized file in $INC/FS/cdr/: $file\n";
522       next;
523     };
524     my $mod = $1;
525     my $info = eval "use FS::cdr::$mod; ".
526                     "\\%FS::cdr::$mod\::info;";
527     if ( $@ ) {
528       die "error using FS::cdr::$mod (skipping): $@\n" if $@;
529       next;
530     }
531     unless ( keys %$info ) {
532       warn "no %info hash found in FS::cdr::$mod, skipping\n";
533       next;
534     }
535     warn "got CDR format info from FS::cdr::$mod: $info\n" if $DEBUG;
536     if ( exists($info->{'disabled'}) && $info->{'disabled'} ) {
537       warn "skipping disabled CDR format FS::cdr::$mod" if $DEBUG;
538       next;
539     }
540     $cdr_info{$mod} = $info;
541   }
542 }
543
544 tie my %import_formats, 'Tie::IxHash',
545   map  { $_ => $cdr_info{$_}->{'name'} }
546   sort { $cdr_info{$a}->{'weight'} <=> $cdr_info{$b}->{'weight'} }
547   grep { exists($cdr_info{$_}->{'import_fields'}) }
548   keys %cdr_info;
549
550 sub import_formats {
551   %import_formats;
552 }
553
554 sub _cdr_min_parser_maker {
555   my $field = shift;
556   my @fields = ref($field) ? @$field : ($field);
557   @fields = qw( billsec duration ) unless scalar(@fields);
558   return sub {
559     my( $cdr, $min ) = @_;
560     my $sec = eval { _cdr_min_parse($min) };
561     die "error parsing seconds for @fields from $min minutes: $@\n" if $@;
562     $cdr->$_($sec) foreach @fields;
563   };
564 }
565
566 sub _cdr_min_parse {
567   my $min = shift;
568   sprintf('%.0f', $min * 60 );
569 }
570
571 sub _cdr_date_parser_maker {
572   my $field = shift;
573   return sub {
574     my( $cdr, $date ) = @_;
575     #$cdr->$field( _cdr_date_parse($date) );
576     eval { $cdr->$field( _cdr_date_parse($date) ); };
577     die "error parsing date for $field from $date: $@\n" if $@;
578   };
579 }
580
581 sub _cdr_date_parse {
582   my $date = shift;
583
584   return '' unless length($date); #that's okay, it becomes NULL
585
586   my($year, $mon, $day, $hour, $min, $sec);
587
588   #$date =~ /^\s*(\d{4})[\-\/]\(\d{1,2})[\-\/](\d{1,2})\s+(\d{1,2}):(\d{1,2}):(\d{1,2})\s*$/
589   #taqua  #2007-10-31 08:57:24.113000000
590
591   if ( $date =~ /^\s*(\d{4})\D(\d{1,2})\D(\d{1,2})\s+(\d{1,2})\D(\d{1,2})\D(\d{1,2})(\D|$)/ ) {
592     ($year, $mon, $day, $hour, $min, $sec) = ( $1, $2, $3, $4, $5, $6 );
593   } elsif ( $date  =~ /^\s*(\d{1,2})\D(\d{1,2})\D(\d{4})\s+(\d{1,2})\D(\d{1,2})\D(\d{1,2})(\D|$)/ ) {
594     ($mon, $day, $year, $hour, $min, $sec) = ( $1, $2, $3, $4, $5, $6 );
595   } else {
596      die "unparsable date: $date"; #maybe we shouldn't die...
597   }
598
599   return '' if $year == 1900 && $mon == 1 && $day == 1
600             && $hour == 0    && $min == 0 && $sec == 0;
601
602   timelocal($sec, $min, $hour, $day, $mon-1, $year);
603 }
604
605 =item batch_import HASHREF
606
607 Imports CDR records.  Available options are:
608
609 =over 4
610
611 =item filehandle
612
613 =item format
614
615 =back
616
617 =cut
618
619 sub batch_import {
620   my $param = shift;
621
622   my $fh = $param->{filehandle};
623   my $format = $param->{format};
624   my $cdrbatch = $param->{cdrbatch};
625
626   return "Unknown format $format"
627     unless exists( $cdr_info{$format} )
628         && exists( $cdr_info{$format}->{'import_fields'} );
629
630   my $info = $cdr_info{$format};
631
632   my $type = exists($info->{'type'}) ? lc($info->{'type'}) : 'csv';
633
634   my $parser;
635   if ( $type eq 'csv' ) {
636     eval "use Text::CSV_XS;";
637     die $@ if $@;
638     $parser = new Text::CSV_XS;
639   } elsif ( $type eq 'fixedlength' ) {
640     eval "use Parse::FixedLength;";
641     die $@ if $@;
642     $parser = new Parse::FixedLength $info->{'fixedlength_format'};
643   } else {
644     die "Unknown CDR format type $type for format $format\n";
645   }
646
647   my $imported = 0;
648   #my $columns;
649
650   local $SIG{HUP} = 'IGNORE';
651   local $SIG{INT} = 'IGNORE';
652   local $SIG{QUIT} = 'IGNORE';
653   local $SIG{TERM} = 'IGNORE';
654   local $SIG{TSTP} = 'IGNORE';
655   local $SIG{PIPE} = 'IGNORE';
656
657   my $oldAutoCommit = $FS::UID::AutoCommit;
658   local $FS::UID::AutoCommit = 0;
659   my $dbh = dbh;
660
661   my $header_lines = exists($info->{'header'}) ? $info->{'header'} : 0;
662
663   my $line;
664   while ( defined($line=<$fh>) ) {
665
666     next if $header_lines-- > 0; #&& $line =~ /^[\w, "]+$/ 
667
668     my @columns = ();
669     if ( $type eq 'csv' ) {
670
671       $parser->parse($line) or do {
672         $dbh->rollback if $oldAutoCommit;
673         return "can't parse: ". $parser->error_input();
674       };
675
676       @columns = $parser->fields();
677
678     } elsif ( $type eq 'fixedlength' ) {
679
680       @columns = $parser->parse($line);
681
682     } else {
683       die "Unknown CDR format type $type for format $format\n";
684     }
685
686     #warn join('-',@columns);
687
688     if ( $format eq 'simple' ) { #should be a callback or opt in FS::cdr::simple
689       @columns = map { s/^ +//; $_; } @columns;
690     }
691
692     my @later = ();
693     my %cdr =
694       map {
695
696         my $field_or_sub = $_;
697         if ( ref($field_or_sub) ) {
698           push @later, $field_or_sub, shift(@columns);
699           ();
700         } else {
701           ( $field_or_sub => shift @columns );
702         }
703
704       }
705       @{ $info->{'import_fields'} }
706     ;
707  
708     $cdr{cdrbatch} = $cdrbatch;
709
710     my $cdr = new FS::cdr ( \%cdr );
711
712     while ( scalar(@later) ) {
713       my $sub = shift @later;
714       my $data = shift @later;
715       &{$sub}($cdr, $data);  # $cdr->&{$sub}($data); 
716     }
717
718     if ( $format eq 'taqua' ) { #should be a callback or opt in FS::cdr::taqua
719       if ( $cdr->enddate && $cdr->startdate  ) { #a bit more?
720         $cdr->duration( $cdr->enddate - $cdr->startdate  );
721       }
722       if ( $cdr->enddate && $cdr->answerdate ) { #a bit more?
723         $cdr->billsec(  $cdr->enddate - $cdr->answerdate );
724       } 
725     }
726
727     my $error = $cdr->insert;
728     if ( $error ) {
729       $dbh->rollback if $oldAutoCommit;
730       return $error;
731
732       #or just skip?
733       #next;
734     }
735
736     $imported++;
737   }
738
739   $dbh->commit or die $dbh->errstr if $oldAutoCommit;
740
741   #might want to disable this if we skip records for any reason...
742   return "Empty file!" unless $imported || $param->{empty_ok};
743
744   '';
745
746 }
747
748 =back
749
750 =head1 BUGS
751
752 =head1 SEE ALSO
753
754 L<FS::Record>, schema.html from the base documentation.
755
756 =cut
757
758 1;
759