71513: Card tokenization [upgrade implemented]
[freeside.git] / bin / cdr-opensips.import
1 #!/usr/bin/perl
2
3 use strict;
4 use vars qw( $DEBUG );
5 use Date::Parse 'str2time';
6 use Date::Format 'time2str';
7 use FS::UID qw(adminsuidsetup dbh);
8 use FS::cdr;
9 use DBI;
10 use Getopt::Std;
11
12 my %opt;
13 getopts('H:U:P:D:T:s:e:c:', \%opt);
14 my $user = shift or die &usage;
15
16 my $dsn = 'dbi:mysql';
17 $dsn .= ":database=$opt{D}" if $opt{D};
18 $dsn .= ":host=$opt{H}" if $opt{H};
19
20 my $mysql = DBI->connect($dsn, $opt{U}, $opt{P}) 
21   or die $DBI::errstr;
22
23 my ($start, $end) = ('', '');
24 if ( $opt{s} ) {
25   $start = str2time($opt{s}) or die "can't parse start date $opt{s}\n";
26   $start = time2str('%Y-%m-%d', $start);
27 }
28 if ( $opt{e} ) {
29   $end = str2time($opt{e}) or die "can't parse end date $opt{e}\n";
30   $end = time2str('%Y-%m-%d', $end);
31 }
32
33 adminsuidsetup $user;
34
35 my $fsdbh = FS::UID::dbh;
36
37 # check for existence of freesidestatus
38 my $table = $opt{T} || 'acc';
39 my $status = $mysql->selectall_arrayref("SHOW COLUMNS FROM $table WHERE Field = 'freesidestatus'");
40 if( ! @$status ) {
41   print "Adding freesidestatus column...\n";
42   $mysql->do("ALTER TABLE $table ADD COLUMN freesidestatus varchar(32)")
43     or die $mysql->errstr;
44 }
45 else {
46   print "freesidestatus column present\n";
47 }
48
49 my @cols = ( qw( 
50   id caller_id callee_id method from_tag to_tag callid sip_code sip_reason 
51   time )
52 );
53
54 my $sql = 'SELECT '.join(',', @cols). " FROM $table".
55   ' WHERE freesidestatus IS NULL' .
56   ' AND sip_code = 200 ' . # only want successful calls
57   ($start && " AND time >= '$start'") .
58   ($end   && " AND time <  '$end'") .
59   ' ORDER BY time'; # should ensure INVITE/ACK/BYE order
60 my $sth = $mysql->prepare($sql);
61 $sth->execute;
62 print "Importing ".$sth->rows." records...\n";
63
64 my $cdr_batch = new FS::cdr_batch({ 
65     'cdrbatch' => 'mysql-import-'. time2str('%Y/%m/%d-%T',time),
66   });
67 my $error = $cdr_batch->insert;
68 die $error if $error;
69 my $cdrbatchnum = $cdr_batch->cdrbatchnum;
70 my $imports = 0;
71 my $updates = 0;
72
73 my %cdrs;
74 my $row;
75 while ( $row = $sth->fetchrow_hashref ) {
76   my ($callid) = $row->{'callid'};
77   $callid =~ s/@.*//;
78   if ( !$callid ) {
79     warn $row->{'time'} . ": no callid, skipped.\n";
80     next;
81   }
82
83   #i guess now we're NANPA-centric, but at least we warn on non-numeric numbers
84   my $src = '';
85   my $src_ip = '';
86   if ( $row->{'caller_id'} =~ /^sip:(\+1?)?(\w+)@(.*)/ ) {
87     $src = $2;
88     my $rest = $3;
89     if ($rest =~ /^([\d\.]{7,15})/) {
90       # canonicalize it so that ascii sort order works
91       $src_ip = sprintf('%03d.%03d.%03d.%03d', split('\.', $1));
92     }
93   } else {
94     warn "unparseable caller_id ". $row->{'caller_id'}. "\n";
95   }
96
97   my $dst = '';
98   my $dst_ip = '';
99   if ( $row->{'callee_id'} =~ /^sip:(\+1?)?(\w+)@(.*)/ ) {
100     $dst = $2;
101     my $rest = $3;
102     if ($rest =~ /^([\d\.]{7,15})/) {
103       $dst_ip = sprintf('%03d.%03d.%03d.%03d', split('\.', $1));
104     }
105   } else {
106     warn "unparseable callee_id ". $row->{'callee_id'}. "\n";
107   }
108
109   my $cdr = $cdrs{$callid};
110   if ( !$cdr ) {
111     $cdr = $cdrs{$callid} = FS::cdr->new ({
112       uniqueid    => $callid,
113       cdrbatchnum => $cdrbatchnum,
114     });
115     $cdr->cdrtypenum($opt{c}) if $opt{c};
116   }
117   my $date = str2time($row->{'time'});
118   if ( $row->{'method'} eq 'INVITE' ) {
119     $cdr->startdate($date);
120     $cdr->src($src);
121     $cdr->dst($dst);
122     $cdr->src_ip_addr($src_ip);
123     $cdr->dst_ip_addr($dst_ip);
124   }
125   elsif ( $row->{'method'} eq 'ACK' ) {
126     $cdr->answerdate($date);
127     next if !check_cdr($cdr, $src, $dst);
128   }
129   elsif ( $row->{'method'} eq 'BYE' ) {
130     $cdr->enddate($date);
131     next if !check_cdr($cdr, $src, $dst);
132   }
133   if ( $cdr->startdate and $cdr->answerdate and $cdr->enddate ) {
134     $cdr->duration($cdr->enddate - $cdr->startdate);
135     $cdr->billsec($cdr->enddate - $cdr->answerdate);
136     my $error = $cdr->insert;
137     if($error) {
138       print "failed import: $error\n";
139     }
140     else {
141       $imports++;
142       if( $updates += $mysql->do("UPDATE $table SET freesidestatus = 'done' 
143           WHERE sip_code = 200 AND callid = ?",
144           undef,
145           $row->{'callid'}
146         ) ) { #nothing
147       }
148       else {
149         print "failed to set status: ".$mysql->errstr."\n";
150       }
151       delete $cdrs{$callid};
152     }
153   }
154 }
155 print "Done.\nImported $imports CDRs, marked $updates accounting events as done.\n";
156 if ( keys(%cdrs) ) {
157   print "Skipped ".scalar(keys(%cdrs))." incomplete calls.\n";
158 }
159 $mysql->disconnect;
160
161 sub usage {
162   "Usage: \n  cdr-opensips.import\n\t[ -H host ]\n\t-D database\n\t-U user\n\t-P password\n\t[ -s start ] [ -e end ] [ -c cdrtypenum ] \n\tfreesideuser\n";
163 }
164
165 sub check_cdr {
166   # Verify that these records belong to the same call.
167   # BYE records sometimes have the caller/callee fields swapped.
168   # We allow empty src/dst so as not to make noise about incomplete calls. If 
169   # this check fails, something is wrong with the source data.
170   my ($cdr, $a, $b) = @_;
171   if ( ( $cdr->src and $cdr->src ne $a and $cdr->src ne $b )
172     or ( $cdr->dst and $cdr->dst ne $a and $cdr->dst ne $b ) ) {
173     warn $cdr->uniqueid . ": src/dst mismatch, skipped.\n";
174     return 0;
175   }
176   return 1;
177 }