RT# 82942 Replace DBI->connect() with FS::DBI->connect()
[freeside.git] / FS / FS / cdr / Import.pm
1 package FS::cdr::Import;
2
3 use strict;
4 use Date::Format 'time2str';
5 use FS::UID qw(adminsuidsetup dbh);
6 use FS::cdr;
7 use FS::DBI;
8 use Getopt::Std;
9
10 use vars qw( $DEBUG );
11 $DEBUG = 0;
12
13 =head1 NAME
14
15 FS::cdr::Import - CDR importing
16
17 =head1 SYNOPSIS
18
19   use FS::cdr::Import;
20
21   FS::cdr::Import->dbi_import(
22     'dbd'                => 'Pg', #mysql, Sybase, etc.
23     'database'           => 'DATABASE_NAME',
24     'table'              => 'TABLE_NAME',,
25     'status_table'       => 'STATUS_TABLE_NAME', # if using a table rather than field in main table
26     'primary_key'        => 'BILLING_ID',
27     'primary_key_info'   => 'BIGINT', # defaults to bigint
28     'status_column'      => 'STATUS_COLUMN_NAME', # defaults to freesidestatus
29     'status_column_info' => 'varchar(32)', # defaults to varchar(32)
30     'column_map'         => { #freeside => remote_db
31       'freeside_column'    => 'remote_db_column',
32       'freeside_column'    => sub { my $row = shift; $row->{remote_db_column}; },
33     },
34     'batch_name'         => 'batch_name', # cdr_batch name -import-date gets appended.
35   );
36
37 =head1 DESCRIPTION
38
39 CDR importing
40
41 =head1 CLASS METHODS
42
43 =item dbi_import
44
45 =cut
46
47 sub dbi_import {
48   my $class = shift;
49   my %args = @_; #args are specifed by the script using this sub
50
51   my %opt; #opt is specified for each install / run of the script
52   getopts('H:U:P:D:T:c:L:S:', \%opt);
53
54   my $user = shift(@ARGV) or die $class->cli_usage;
55   my $database = $opt{D} || $args{database};
56   my $table = $opt{T} || $args{table};
57   my $pkey = $args{primary_key};
58   my $pkey_info = $args{primary_key_info} ? $args{primary_key_info} : 'BIGINT';
59   my $status_table = $opt{S} || $args{status_table};
60   my $dbd_type = $args{'dbd'} ? $args{'dbd'} : 'Pg';
61   my $status_column = $args{status_column} ? $args{status_column} : 'freesidestatus';
62   my $status_column_info = $args{status_column_info} ? $args{status_column} : 'VARCHAR(32)';
63   my $st_sql;
64   my $batch_name = $args{batch_name} ? $args{batch_name} : 'CDR_DB';
65
66   my $queries = get_queries({
67     'dbd'                 => $dbd_type,
68     'host'                => $opt{H},
69     'table'               => $table,
70     'status_column'       => $status_column,
71     'status_column_info'  => $status_column_info,
72     'status_table'        => $status_table,
73     'primary_key'         => $pkey,
74     'primary_key_info'    => $pkey_info,
75   });
76
77   my $dsn = 'dbi:'. $dbd_type . $queries->{connect_type};
78   $dsn .= ";database=$database" if $database;
79
80   my $dbi = FS::DBI->connect($dsn, $opt{U}, $opt{P})
81     or die $FS::DBI::errstr;
82
83   adminsuidsetup $user;
84
85   ## check for status table if using. if not there create it.
86   if ($status_table) {
87     my $status = $dbi->selectall_arrayref( $queries->{check_statustable} );
88     if( ! @$status ) {
89       print "Adding status table $status_table ...\n";
90       $dbi->do( $queries->{create_statustable} )
91         or die $dbi->errstr;
92     }
93     $st_sql = "INSERT INTO $status_table ( $pkey, $status_column ) VALUES ( ?, 'done' )";
94   }
95   ## check for column freeside status if not using status table and create it if not there.
96   else {
97     my $status = $dbi->selectall_arrayref( $queries->{check_statuscolumn} );
98     if( ! @$status ) {
99       print "Adding $status_column column...\n";
100       $dbi->do( $queries->{create_statuscolumn} )
101         or die $dbi->errstr;
102     }
103     $st_sql = "UPDATE $table SET $status_column = 'done' WHERE $pkey = ?";
104   }
105
106   #my @cols = values %{ $args{column_map} };
107   my $sql = "SELECT $table.* FROM $table "; # join(',', @cols). " FROM $table ".
108   $sql .=  "LEFT JOIN $status_table ON ( $table.$pkey = $status_table.$pkey ) "
109     if $status_table;
110   $sql .= "WHERE  $status_column IS NULL ";
111
112   #$sql .= ' LIMIT '. $opt{L} if $opt{L};
113   my $sth = $dbi->prepare($sql);
114   $sth->execute or die $sth->errstr. " executing $sql";
115
116   my $cdr_batch = new FS::cdr_batch({ 
117       'cdrbatch' => $batch_name . '-import-'. time2str('%Y/%m/%d-%T',time),
118     });
119   my $error = $cdr_batch->insert;
120   die $error if $error;
121   my $cdrbatchnum = $cdr_batch->cdrbatchnum;
122   my $imported = 0;
123
124   my $row;
125   while ( $row = $sth->fetchrow_hashref ) {
126
127     my %hash = ( 'cdrbatchnum' => $cdrbatchnum );
128     foreach my $field ( keys %{ $args{column_map} } ) {
129       my $col_or_coderef = $args{column_map}->{$field};
130       if ( ref($col_or_coderef) eq 'CODE' ) {
131         $hash{$field} = &{ $col_or_coderef }( $row );
132       } else {
133         $hash{$field} = $row->{ $col_or_coderef };
134       }
135       $hash{$field} = '' if $hash{$field} =~ /^\s+$/; #IVR (MSSQL?) bs
136     }
137
138     my $cdr = FS::cdr->new(\%hash);
139
140     $cdr->cdrtypenum($opt{c}) if $opt{c};
141
142     my $pkey_value = $row->{$pkey};
143
144     #print "$pkey_value\n" if $opt{v};
145     my $error = $cdr->insert;
146
147     if ($error) {
148
149       #die "$pkey_value: failed import: $error\n";
150       print "$pkey_value: failed import: $error\n";
151
152     } else {
153
154       $imported++;
155
156       my $updated = $dbi->do($st_sql, undef, $pkey_value );
157       #$updates += $updated;
158       die "failed to set status: ".$dbi->errstr."\n" unless $updated;
159
160     }
161
162     if ( $opt{L} && $imported >= $opt{L} ) {
163       $sth->finish;
164       last;
165     }
166
167   }
168   print "Done.\n";
169   print "Imported $imported CDRs.\n" if $imported;
170
171   $dbi->disconnect;
172
173 }
174
175 sub cli_usage {
176   "Usage: \n  $0\n\t-H hostname\n\t[ -D database ]\n\t-U user\n\t-P password\n\t[ -c cdrtypenum ]\n\t[ -L num_cdrs_limit ]\n\t[ -T table ]\n\t[ -S status table ]\n\tfreesideuser\n";
177 }
178
179 sub get_queries {
180   #my ($dbd, $host, $table, $column, $column_create_info, $status_table, $primary_key, $primary_key_info) = @_;
181   my $info = shift;
182
183   #get host and port information.
184   my ($host, $port) = split /:/, $info->{host};
185   $host ||= 'localhost';
186   $port ||= '5000'; # check for pg default 5000 is sybase.
187
188   my %dbi_connect_types = (
189     'Sybase'  => ':server='.$host.';port='.$port,
190     'Pg'      => ':host='.$info->{host},
191   );
192
193   #Check for freeside status table
194   my %dbi_check_statustable = (
195     'Sybase'  => "SELECT * FROM sysobjects WHERE name = '$info->{status_table}'",
196     'Pg'      => "SELECT * FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$info->{status_table}' AND column_name = '$info->{status_column}'",
197   );
198
199   #Create freeside status table
200   my %dbi_create_statustable = (
201     'Sybase'  => "CREATE TABLE $info->{status_table} ( $info->{primary_key} $info->{primary_key_info}, $info->{status_column} $info->{status_column_info} )",
202     'Pg'      => "CREATE TABLE $info->{status_table} ( $info->{primary_key} $info->{primary_key_info}, $info->{status_column} $info->{status_column_info} )",
203   );
204
205   #Check for freeside status column
206   my %dbi_check_statuscolumn = (
207     'Sybase'  => "SELECT syscolumns.name FROM sysobjects
208                   JOIN syscolumns ON sysobjects.id = syscolumns.id
209                   WHERE sysobjects.name LIKE '$info->{table}' AND syscolumns.name = '$info->{status_column}'",
210     'Pg'      => "SELECT * FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$info->{table}' AND column_name = '$info->{status_column}' ",
211   );
212
213     #Create freeside status column
214   my %dbi_create_statuscolumn = (
215     'Sybase'  => "ALTER TABLE $info->{table} ADD $info->{status_column} $info->{status_column_info} NULL",
216     'Pg'      => "ALTER TABLE $info->{table} ADD COLUMN $info->{status_column} $info->{status_column_info}",
217   );
218
219   my $queries = {
220     'connect_type'         =>  $dbi_connect_types{$info->{dbd}},
221     'check_statustable'    =>  $dbi_check_statustable{$info->{dbd}},
222     'create_statustable'   =>  $dbi_create_statustable{$info->{dbd}},
223     'check_statuscolumn'   =>  $dbi_check_statuscolumn{$info->{dbd}},
224     'create_statuscolumn'  =>  $dbi_create_statuscolumn{$info->{dbd}},
225   };
226
227   return $queries;
228 }
229
230 =head1 BUGS
231
232 currently works with Pg(Postgresql) and Sybase(Sybase AES)
233
234 Sparse documentation.
235
236 =head1 SEE ALSO
237
238 L<FS::cdr>
239
240 =cut
241
242 1;