summaryrefslogtreecommitdiff
path: root/bin/cdr-opensips.import
blob: b8169d5236f44f708e666c15a48c947455eb1114 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/perl

use strict;
use vars qw( $DEBUG );
use Date::Parse 'str2time';
use Date::Format 'time2str';
use FS::UID qw(adminsuidsetup dbh);
use FS::cdr;
use DBI;
use Getopt::Std;

my %opt;
getopts('H:U:P:D:T:s:e:c:', \%opt);
my $user = shift or die &usage;

my $dsn = 'dbi:mysql';
$dsn .= ":database=$opt{D}" if $opt{D};
$dsn .= ":host=$opt{H}" if $opt{H};

my $mysql = DBI->connect($dsn, $opt{U}, $opt{P}) 
  or die $DBI::errstr;

my ($start, $end) = ('', '');
if ( $opt{s} ) {
  $start = str2time($opt{s}) or die "can't parse start date $opt{s}\n";
  $start = time2str('%Y-%m-%d', $start);
}
if ( $opt{e} ) {
  $end = str2time($opt{e}) or die "can't parse end date $opt{e}\n";
  $end = time2str('%Y-%m-%d', $end);
}

adminsuidsetup $user;

my $fsdbh = FS::UID::dbh;

# check for existence of freesidestatus
my $table = $opt{T} || 'acc';
my $status = $mysql->selectall_arrayref("SHOW COLUMNS FROM $table WHERE Field = 'freesidestatus'");
if( ! @$status ) {
  print "Adding freesidestatus column...\n";
  $mysql->do("ALTER TABLE $table ADD COLUMN freesidestatus varchar(32)")
    or die $mysql->errstr;
}
else {
  print "freesidestatus column present\n";
}

my @cols = ( qw( 
  id caller_id callee_id method from_tag to_tag callid sip_code sip_reason 
  time )
);

my $sql = 'SELECT '.join(',', @cols). " FROM $table".
  ' WHERE freesidestatus IS NULL' .
  ' AND sip_code = 200 ' . # only want successful calls
  ($start && " AND time >= '$start'") .
  ($end   && " AND time <  '$end'") .
  ' ORDER BY time'; # should ensure INVITE/ACK/BYE order
my $sth = $mysql->prepare($sql);
$sth->execute;
print "Importing ".$sth->rows." records...\n";

my $cdr_batch = new FS::cdr_batch({ 
    'cdrbatch' => 'mysql-import-'. time2str('%Y/%m/%d-%T',time),
  });
my $error = $cdr_batch->insert;
die $error if $error;
my $cdrbatchnum = $cdr_batch->cdrbatchnum;
my $imports = 0;
my $updates = 0;

my %cdrs;
my $row;
while ( $row = $sth->fetchrow_hashref ) {
  my ($callid) = $row->{'callid'};
  $callid =~ s/@.*//;
  if ( !$callid ) {
    warn $row->{'time'} . ": no callid, skipped.\n";
    next;
  }

  #i guess now we're NANPA-centric, but at least we warn on non-numeric numbers
  my $src = '';
  my $src_ip = '';
  if ( $row->{'caller_id'} =~ /^sip:(\+1?)?(\w+)@(.*)/ ) {
    $src = $2;
    my $rest = $3;
    if ($rest =~ /^([\d\.]{7,15})/) {
      # canonicalize it so that ascii sort order works
      $src_ip = sprintf('%03d.%03d.%03d.%03d', split('\.', $1));
    }
  } else {
    warn "unparseable caller_id ". $row->{'caller_id'}. "\n";
  }

  my $dst = '';
  my $dst_ip = '';
  if ( $row->{'callee_id'} =~ /^sip:(\+1?)?(\w+)@(.*)/ ) {
    $dst = $2;
    my $rest = $3;
    if ($rest =~ /^([\d\.]{7,15})/) {
      $dst_ip = sprintf('%03d.%03d.%03d.%03d', split('\.', $1));
    }
  } else {
    warn "unparseable callee_id ". $row->{'callee_id'}. "\n";
  }

  my $cdr = $cdrs{$callid};
  if ( !$cdr ) {
    $cdr = $cdrs{$callid} = FS::cdr->new ({
      uniqueid    => $callid,
      cdrbatchnum => $cdrbatchnum,
    });
    $cdr->cdrtypenum($opt{c}) if $opt{c};
  }
  my $date = str2time($row->{'time'});
  if ( $row->{'method'} eq 'INVITE' ) {
    $cdr->startdate($date);
    $cdr->src($src);
    $cdr->dst($dst);
    $cdr->src_ip_addr($src_ip);
    $cdr->dst_ip_addr($dst_ip);
  }
  elsif ( $row->{'method'} eq 'ACK' ) {
    $cdr->answerdate($date);
    next if !check_cdr($cdr, $src, $dst);
  }
  elsif ( $row->{'method'} eq 'BYE' ) {
    $cdr->enddate($date);
    next if !check_cdr($cdr, $src, $dst);
  }
  if ( $cdr->startdate and $cdr->answerdate and $cdr->enddate ) {
    $cdr->duration($cdr->enddate - $cdr->startdate);
    $cdr->billsec($cdr->enddate - $cdr->answerdate);
    my $error = $cdr->insert;
    if($error) {
      print "failed import: $error\n";
    }
    else {
      $imports++;
      if( $updates += $mysql->do("UPDATE $table SET freesidestatus = 'done' 
          WHERE sip_code = 200 AND callid = ?",
          undef,
          $row->{'callid'}
        ) ) { #nothing
      }
      else {
        print "failed to set status: ".$mysql->errstr."\n";
      }
      delete $cdrs{$callid};
    }
  }
}
print "Done.\nImported $imports CDRs, marked $updates accounting events as done.\n";
if ( keys(%cdrs) ) {
  print "Skipped ".scalar(keys(%cdrs))." incomplete calls.\n";
}
$mysql->disconnect;

sub usage {
  "Usage: \n  cdr-opensips.import\n\t[ -H host ]\n\t-D database\n\t-U user\n\t-P password\n\t[ -s start ] [ -e end ] [ -c cdrtypenum ] \n\tfreesideuser\n";
}

sub check_cdr {
  # Verify that these records belong to the same call.
  # BYE records sometimes have the caller/callee fields swapped.
  # We allow empty src/dst so as not to make noise about incomplete calls. If 
  # this check fails, something is wrong with the source data.
  my ($cdr, $a, $b) = @_;
  if ( ( $cdr->src and $cdr->src ne $a and $cdr->src ne $b )
    or ( $cdr->dst and $cdr->dst ne $a and $cdr->dst ne $b ) ) {
    warn $cdr->uniqueid . ": src/dst mismatch, skipped.\n";
    return 0;
  }
  return 1;
}