RT# 76398 - Added auto status field or status table creation
[freeside.git] / FS / FS / cdr / Import.pm
1 package FS::cdr::Import;
2
3 use strict;
4 use Date::Format 'time2str';
5 use FS::UID qw(adminsuidsetup dbh);
6 use FS::cdr;
7 use DBI;
8 use Getopt::Std;
9
10 use vars qw( $DEBUG );
11 $DEBUG = 0;
12
13 =head1 NAME
14
15 FS::cdr::Import - CDR importing
16
17 =head1 SYNOPSIS
18
19   use FS::cdr::Import;
20
21   FS::cdr::Import->dbi_import(
22     'dbd'         => 'Pg', #mysql, Sybase, etc.
23     'table'       => 'TABLE_NAME',
24     'primary_key' => 'BILLING_ID',
25     'column_map'  => { #freeside => remote_db
26       'freeside_column' => 'remote_db_column',
27       'freeside_column' => sub { my $row = shift; $row->{remote_db_column}; },
28     },
29   );
30
31 =head1 DESCRIPTION
32
33 CDR importing
34
35 =head1 CLASS METHODS
36
37 =item dbi_import
38
39 =cut
40
41 sub dbi_import {
42   my $class = shift;
43   my %args = @_; #args are specifed by the script using this sub
44
45   my %opt; #opt is specified for each install / run of the script
46   getopts('H:U:P:D:T:c:L:S:', \%opt);
47
48   my $user = shift(@ARGV) or die $class->cli_usage;
49   my $database = $opt{D} || $args{database};
50   my $table = $opt{T} || $args{table};
51   my $pkey = $args{primary_key};
52   my $pkey_info = $args{primary_key_info} ? $args{primary_key_info} : 'BIGINT';
53   my $status_table = $opt{S} || $args{status_table};
54   my $dbd_type = $args{'dbd'} ? $args{'dbd'} : 'Pg';
55   my $status_column = $args{status_column} ? $args{status_column} : 'freesidestatus';
56   my $status_column_info = $args{status_column_info} ? $args{status_column} : 'VARCHAR(32)';
57
58   my $queries = get_queries({
59     'dbd'                 => $dbd_type,
60     'table'               => $table,
61     'status_column'       => $status_column,
62     'status_column_info'  => $status_column_info,
63     'status_table'        => $status_table,
64     'primary_key'         => $pkey,
65     'primary_key_info'    => $pkey_info,
66   });
67
68   my $dsn = 'dbi:'. $dbd_type;
69   $dsn .= $queries->{connect_type} . "=$opt{H}";
70   $dsn .= ";database=$database" if $database;
71
72   my $dbi = DBI->connect($dsn, $opt{U}, $opt{P}) 
73     or die $DBI::errstr;
74
75   adminsuidsetup $user;
76
77   ## check for status table if using. if not there create it.
78   if ($status_table) {
79     my $status = $dbi->selectall_arrayref( $queries->{check_statustable} );
80     if( ! @$status ) {
81       print "Adding status table $status_table ...\n";
82       $dbi->do( $queries->{create_statustable} )
83         or die $dbi->errstr;
84     }
85   }
86   ## check for column freeside status if not using status table and create it if not there.
87   else {
88     my $status = $dbi->selectall_arrayref( $queries->{check_statuscolumn} );
89     if( ! @$status ) {
90       print "Adding $status_column column...\n";
91       $dbi->do( $queries->{create_statuscolumn} )
92         or die $dbi->errstr;
93     }
94   }
95
96   #my @cols = values %{ $args{column_map} };
97   my $sql = "SELECT $table.* FROM $table "; # join(',', @cols). " FROM $table ".
98   $sql .=  "LEFT JOIN $status_table ON ( $table.$pkey = $status_table.$pkey ) "
99     if $status_table;
100   $sql .= "WHERE  $status_column IS NULL ";
101
102   #my @cols = values %{ $args{column_map} };
103   my $sql = "SELECT $table.* FROM $table "; # join(',', @cols). " FROM $table ".
104   $sql .=  'LEFT JOIN '. $args{status_table}.
105            " ON ( $table.$pkey = ". $args{status_table}. ".$pkey )"
106     if $args{status_table};
107   $sql .= ' WHERE freesidestatus IS NULL ';
108
109   #$sql .= ' LIMIT '. $opt{L} if $opt{L};
110   my $sth = $dbi->prepare($sql);
111   $sth->execute or die $sth->errstr. " executing $sql";
112
113   my $cdr_batch = new FS::cdr_batch({ 
114       'cdrbatch' => 'IVR-import-'. time2str('%Y/%m/%d-%T',time),
115     });
116   my $error = $cdr_batch->insert;
117   die $error if $error;
118   my $cdrbatchnum = $cdr_batch->cdrbatchnum;
119   my $imported = 0;
120
121   my $row;
122   while ( $row = $sth->fetchrow_hashref ) {
123
124     my %hash = ( 'cdrbatchnum' => $cdrbatchnum );
125     foreach my $field ( keys %{ $args{column_map} } ) {
126       my $col_or_coderef = $args{column_map}->{$field};
127       if ( ref($col_or_coderef) eq 'CODE' ) {
128         $hash{$field} = &{ $col_or_coderef }( $row );
129       } else {
130         $hash{$field} = $row->{ $col_or_coderef };
131       }
132       $hash{$field} = '' if $hash{$field} =~ /^\s+$/; #IVR (MSSQL?) bs
133     }
134
135     my $cdr = FS::cdr->new(\%hash);
136
137     $cdr->cdrtypenum($opt{c}) if $opt{c};
138
139     my $pkey_value = $row->{$pkey};
140
141     #print "$pkey_value\n" if $opt{v};
142     my $error = $cdr->insert;
143
144     if ($error) {
145
146       #die "$pkey_value: failed import: $error\n";
147       print "$pkey_value: failed import: $error\n";
148
149     } else {
150
151       $imported++;
152
153       my $st_sql;
154       if ( $args{status_table} ) {
155
156         $st_sql = 
157           'INSERT INTO '. $status_table. " ( $pkey, $status_column ) ".
158             " VALUES ( ?, 'done' )";
159
160       } else {
161
162         $st_sql = "UPDATE $table SET $status_column = 'done' WHERE $pkey = ?";
163
164       }
165
166       my $updated = $dbi->do($st_sql, undef, $pkey_value );
167       #$updates += $updated;
168       die "failed to set status: ".$dbi->errstr."\n" unless $updated;
169
170     }
171
172     if ( $opt{L} && $imported >= $opt{L} ) {
173       $sth->finish;
174       last;
175     }
176
177   }
178   print "Done.\n";
179   print "Imported $imported CDRs.\n" if $imported;
180
181   $dbi->disconnect;
182
183 }
184
185 sub cli_usage {
186   #"Usage: \n  $0\n\t[ -H hostname ]\n\t-D database\n\t-U user\n\t-P password\n\tfreesideuser\n";
187   #"Usage: \n  $0\n\t-H hostname\n\t-D database\n\t-U user\n\t-P password\n\t[ -c cdrtypenum ]\n\tfreesideuser\n";
188   "Usage: \n  $0\n\t-H hostname\n\t[ -D database ]\n\t-U user\n\t-P password\n\t[ -c cdrtypenum ]\n\t[ -L num_cdrs_limit ]\n\tfreesideuser\n";
189 }
190
191 sub get_queries {
192   #my ($dbd, $table, $column, $column_create_info, $status_table, $primary_key, $primary_key_info) = @_;
193   my $info = shift;
194
195   #do we want to add more types? or add as we go?
196   my %dbi_connect_types = (
197     'Sybase'  => ':server',
198     'Pg'      => ':host',
199   );
200
201   #Check for freeside status table Sybase has not been tested
202   my %dbi_check_statustable = (
203     'Sybase'  => "SELECT systables.name FROM sysobjects
204                   JOIN systables ON sysobjects.id = systables.id
205                   WHERE sysobjects.name LIKE '$info->{table}' AND systables.name = $info->{status_table}",
206     'Pg'      => "SELECT * FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$info->{status_table}' AND column_name = '$info->{status_column}'",
207   );
208
209   #Check for freeside status table Sybase has not been tested
210   my %dbi_create_statustable = (
211     'Sybase'  => "CREATE TABLE $info->{status_table} ( $info->{primary_key} $info->{primary_key_info}, $info->{status_column} $info->{status_column_info} )",
212     'Pg'      => "CREATE TABLE $info->{status_table} ( $info->{primary_key} $info->{primary_key_info}, $info->{status_column} $info->{status_column_info} )",
213   );
214
215   #Check for freeside status column Sybase has not been tested
216   my %dbi_check_statuscolumn = (
217     'Sybase'  => "SELECT syscolumns.name FROM sysobjects
218                   JOIN syscolumns ON sysobjects.id = syscolumns.id
219                   WHERE sysobjects.name LIKE '$info->{table}' AND syscolumns.name = $info->{status_column}",
220     'Pg'      => "SELECT * FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$info->{table}' AND column_name = '$info->{status_column}' ",
221   );
222
223     #Check for freeside status column Sybase has not been tested
224   my %dbi_create_statuscolumn = (
225     'Sybase'  => "ALTER TABLE $info->{table} ADD COLUMN $info->{status_column} $info->{status_column_info}",
226     'Pg'      => "ALTER TABLE $info->{table} ADD COLUMN $info->{status_column} $info->{status_column_info}",
227   );
228
229   my $queries = {
230     'connect_type'         =>  $dbi_connect_types{$info->{dbd}},
231     'check_statustable'    =>  $dbi_check_statustable{$info->{dbd}},
232     'create_statustable'   =>  $dbi_create_statustable{$info->{dbd}},
233     'check_statuscolumn'   =>  $dbi_check_statuscolumn{$info->{dbd}},
234     'create_statuscolumn'  =>  $dbi_create_statuscolumn{$info->{dbd}},
235   };
236
237   return $queries;
238 }
239
240 =head1 BUGS
241
242 This has only been test with Pg -> postgresql databases
243
244 Sparse documentation.
245
246 =head1 SEE ALSO
247
248 L<FS::cdr>
249
250 =cut
251
252 1;