From b205131f91a5cdb5d1fe113518f6212d32c91d88 Mon Sep 17 00:00:00 2001 From: Mark Wells Date: Mon, 12 Aug 2013 17:40:58 -0700 Subject: [PATCH] cursored search, conserve memory during 3.x cust_pay upgrade, #23725 --- FS/FS/Cursor.pm | 120 +++++++++++++++++++++++++++++ FS/FS/PagedSearch.pm | 2 + FS/FS/Record.pm | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++ FS/FS/cust_pay.pm | 25 +++--- 4 files changed, 344 insertions(+), 12 deletions(-) create mode 100644 FS/FS/Cursor.pm diff --git a/FS/FS/Cursor.pm b/FS/FS/Cursor.pm new file mode 100644 index 000000000..f3bc1e23d --- /dev/null +++ b/FS/FS/Cursor.pm @@ -0,0 +1,120 @@ +package FS::Cursor; + +use strict; +use vars qw($DEBUG $buffer); +use base qw( Exporter ); +use FS::Record qw(qsearch dbdef dbh); +use Data::Dumper; +use Scalar::Util qw(refaddr); + +$DEBUG = 0; +# this might become a parameter at some point, but right now, you can +# "local $FS::Cursor::buffer = X;" +$buffer = 200; + +=head1 NAME + +FS::Cursor - Iterator for querying large data sets + +=head1 SYNOPSIS + +use FS::Cursor; + +my $search = FS::Cursor->new('table', { field => 'value' ... }); +while ( my $row = $search->fetch ) { +... +} + +=head1 CLASS METHODS + +=over 4 + +=item new ARGUMENTS + +Constructs a cursored search. Accepts all the same arguments as qsearch, +and returns an FS::Cursor object to fetch the rows one at a time. + +=cut + +sub new { + my $class = shift; + my $q = FS::Record::_query(@_); # builds the statement and parameter list + + my $self = { + query => $q, + class => 'FS::' . ($q->{table} || 'Record'), + buffer => [], + }; + bless $self, $class; + + # the class of record object to return + $self->{class} = "FS::".($q->{table} || 'Record'); + + $self->{id} = sprintf('cursor%08x', refaddr($self)); + my $statement = "DECLARE ".$self->{id}." CURSOR FOR ".$q->{statement}; + + my $dbh = dbh; + my $sth = $dbh->prepare($statement) + or die $dbh->errstr; + my $bind = 0; + foreach my $value ( @{ $q->{value} } ) { + my $bind_type = shift @{ $q->{bind_type} }; + $sth->bind_param($bind++, $value, $bind_type ); + } + + $sth->execute or die $sth->errstr; + + $self->{fetch} = $dbh->prepare("FETCH FORWARD $buffer FROM ".$self->{id}); + + $self; +} + +=back + +=head1 METHODS + +=over 4 + +=item fetch + +Fetch the next row from the search results. + +=cut + +sub fetch { + # might be a little more efficient to do a FETCH NEXT 1000 or something + # and buffer them locally, but the semantics are simpler this way + my $self = shift; + if (@{ $self->{buffer} } == 0) { + my $rows = $self->refill; + return undef if !$rows; + } + $self->{class}->new(shift @{ $self->{buffer} }); +} + +sub refill { + my $self = shift; + my $sth = $self->{fetch}; + $sth->execute or die $sth->errstr; + my $result = $self->{fetch}->fetchall_arrayref( {} ); + $self->{buffer} = $result; + scalar @$result; +} + +=back + +=head1 TO DO + +Replace all uses of qsearch with this. + +=head1 BUGS + +Doesn't support MySQL. + +=head1 SEE ALSO + +L + +=cut + +1; diff --git a/FS/FS/PagedSearch.pm b/FS/FS/PagedSearch.pm index 09d05c4e6..e740965ea 100644 --- a/FS/FS/PagedSearch.pm +++ b/FS/FS/PagedSearch.pm @@ -184,6 +184,8 @@ sub refill { L +L is an eventual replacement for this. + =cut 1; diff --git a/FS/FS/Record.pm b/FS/FS/Record.pm index 87947f085..e6fbf92b6 100644 --- a/FS/FS/Record.pm +++ b/FS/FS/Record.pm @@ -529,6 +529,215 @@ sub qsearch { return @return; } +=item _query + +Construct the SQL statement and parameter-binding list for qsearch. Takes +the qsearch parameters. + +Returns a hash containing: +'table': The primary table name (if there is one). +'statement': The SQL statement itself. +'bind_type': An arrayref of bind types. +'value': An arrayref of parameter values. +'cache': The cache object, if one was passed. + +=cut + +sub _query { + my( @stable, @record, @cache ); + my( @select, @extra_sql, @extra_param, @order_by, @addl_from ); + my @debug = (); + my $cursor = ''; + my %union_options = (); + if ( ref($_[0]) eq 'ARRAY' ) { + my $optlist = shift; + %union_options = @_; + foreach my $href ( @$optlist ) { + push @stable, ( $href->{'table'} or die "table name is required" ); + push @record, ( $href->{'hashref'} || {} ); + push @select, ( $href->{'select'} || '*' ); + push @extra_sql, ( $href->{'extra_sql'} || '' ); + push @extra_param, ( $href->{'extra_param'} || [] ); + push @order_by, ( $href->{'order_by'} || '' ); + push @cache, ( $href->{'cache_obj'} || '' ); + push @addl_from, ( $href->{'addl_from'} || '' ); + push @debug, ( $href->{'debug'} || '' ); + } + die "at least one hashref is required" unless scalar(@stable); + } elsif ( ref($_[0]) eq 'HASH' ) { + my $opt = shift; + $stable[0] = $opt->{'table'} or die "table name is required"; + $record[0] = $opt->{'hashref'} || {}; + $select[0] = $opt->{'select'} || '*'; + $extra_sql[0] = $opt->{'extra_sql'} || ''; + $extra_param[0] = $opt->{'extra_param'} || []; + $order_by[0] = $opt->{'order_by'} || ''; + $cache[0] = $opt->{'cache_obj'} || ''; + $addl_from[0] = $opt->{'addl_from'} || ''; + $debug[0] = $opt->{'debug'} || ''; + } else { + ( $stable[0], + $record[0], + $select[0], + $extra_sql[0], + $cache[0], + $addl_from[0] + ) = @_; + $select[0] ||= '*'; + } + my $cache = $cache[0]; + + my @statement = (); + my @value = (); + my @bind_type = (); + + my $result_table = $stable[0]; + foreach my $stable ( @stable ) { + #stop altering the caller's hashref + my $record = { %{ shift(@record) || {} } };#and be liberal in receipt + my $select = shift @select; + my $extra_sql = shift @extra_sql; + my $extra_param = shift @extra_param; + my $order_by = shift @order_by; + my $cache = shift @cache; + my $addl_from = shift @addl_from; + my $debug = shift @debug; + + #$stable =~ /^([\w\_]+)$/ or die "Illegal table: $table"; + #for jsearch + $stable =~ /^([\w\s\(\)\.\,\=]+)$/ or die "Illegal table: $stable"; + $stable = $1; + + $result_table = '' if $result_table ne $stable; + + my $table = $cache ? $cache->table : $stable; + my $dbdef_table = dbdef->table($table) + or die "No schema for table $table found - ". + "do you need to run freeside-upgrade?"; + my $pkey = $dbdef_table->primary_key; + + my @real_fields = grep exists($record->{$_}), real_fields($table); + + my $statement .= "SELECT $select FROM $stable"; + $statement .= " $addl_from" if $addl_from; + if ( @real_fields ) { + $statement .= ' WHERE '. join(' AND ', + get_real_fields($table, $record, \@real_fields)); + } + + $statement .= " $extra_sql" if defined($extra_sql); + $statement .= " $order_by" if defined($order_by); + + push @statement, $statement; + + warn "[debug]$me $statement\n" if $DEBUG > 1 || $debug; + + + foreach my $field ( + grep defined( $record->{$_} ) && $record->{$_} ne '', @real_fields + ) { + + my $value = $record->{$field}; + my $op = (ref($value) && $value->{op}) ? $value->{op} : '='; + $value = $value->{'value'} if ref($value); + my $type = dbdef->table($table)->column($field)->type; + + my $bind_type = _bind_type($type, $value); + + #if ( $DEBUG > 2 ) { + # no strict 'refs'; + # %TYPE = map { &{"DBI::$_"}() => $_ } @{ $DBI::EXPORT_TAGS{sql_types} } + # unless keys %TYPE; + # warn " bind_param $bind (for field $field), $value, TYPE $TYPE{$TYPE}\n"; + #} + + push @value, $value; + push @bind_type, $bind_type; + + } + + foreach my $param ( @$extra_param ) { + my $bind_type = { TYPE => SQL_VARCHAR }; + my $value = $param; + if ( ref($param) ) { + $value = $param->[0]; + my $type = $param->[1]; + $bind_type = _bind_type($type, $value); + } + push @value, $value; + push @bind_type, $bind_type; + } + } + + my $statement = join( ' ) UNION ( ', @statement ); + $statement = "( $statement )" if scalar(@statement) > 1; + $statement .= " $union_options{order_by}" if $union_options{order_by}; + + return { + statement => $statement, + bind_type => \@bind_type, + value => \@value, + table => $result_table, + cache => $cache, + }; +} + +# qsearch should eventually use this +sub _from_hashref { + my ($table, $cache, @hashrefs) = @_; + my @return; + # XXX get rid of these string evals at some point + # (when we have time to test it) + # my $class = "FS::$table" if $table; + # if ( $class and $class->isa('FS::Record') ) + # if ( $class->can('new') eq \&new ) + # + if ( $table && eval 'scalar(@FS::'. $table. '::ISA);' ) { + if ( eval 'FS::'. $table. '->can(\'new\')' eq \&new ) { + #derivied class didn't override new method, so this optimization is safe + if ( $cache ) { + @return = map { + new_or_cached( "FS::$table", { %{$_} }, $cache ) + } @hashrefs; + } else { + @return = map { + new( "FS::$table", { %{$_} } ) + } @hashrefs; + } + } else { + #okay, its been tested + # warn "untested code (class FS::$table uses custom new method)"; + @return = map { + eval 'FS::'. $table. '->new( { %{$_} } )'; + } @hashrefs; + } + + # Check for encrypted fields and decrypt them. + ## only in the local copy, not the cached object + if ( $conf_encryption + && eval 'defined(@FS::'. $table . '::encrypted_fields)' ) { + foreach my $record (@return) { + foreach my $field (eval '@FS::'. $table . '::encrypted_fields') { + next if $field eq 'payinfo' + && ($record->isa('FS::payinfo_transaction_Mixin') + || $record->isa('FS::payinfo_Mixin') ) + && $record->payby + && !grep { $record->payby eq $_ } @encrypt_payby; + # Set it directly... This may cause a problem in the future... + $record->setfield($field, $record->decrypt($record->getfield($field))); + } + } + } + } else { + cluck "warning: FS::$table not loaded; returning FS::Record objects" + unless $nowarn_classload; + @return = map { + FS::Record->new( $table, { %{$_} } ); + } @hashrefs; + } + return @return; +} + ## makes this easier to read sub get_real_fields { diff --git a/FS/FS/cust_pay.pm b/FS/FS/cust_pay.pm index 2e97429ed..605f21c35 100644 --- a/FS/FS/cust_pay.pm +++ b/FS/FS/cust_pay.pm @@ -23,6 +23,7 @@ use FS::cust_main; use FS::cust_pkg; use FS::cust_pay_void; use FS::upgrade_journal; +use FS::Cursor; $DEBUG = 0; @@ -1037,11 +1038,11 @@ sub _upgrade_data { #class method ### # migrate batchnums from the misused 'paybatch' field to 'batchnum' ### - my @cust_pay = qsearch( { - 'table' => 'cust_pay', - 'addl_from' => ' JOIN pay_batch ON cust_pay.paybatch = CAST(pay_batch.batchnum AS text) ', + my $search = FS::Cursor->new( { + 'table' => 'cust_pay', + 'addl_from' => ' JOIN pay_batch ON cust_pay.paybatch = CAST(pay_batch.batchnum AS text) ', } ); - foreach my $cust_pay (@cust_pay) { + while (my $cust_pay = $search->fetch) { $cust_pay->set('batchnum' => $cust_pay->paybatch); $cust_pay->set('paybatch' => ''); my $error = $cust_pay->replace; @@ -1060,14 +1061,14 @@ sub _upgrade_data { #class method foreach my $table (qw(cust_pay cust_pay_void cust_refund)) { my $and_batchnum_is_null = ( $table =~ /^cust_pay/ ? ' AND batchnum IS NULL' : '' ); - foreach my $object ( qsearch({ - table => $table, - extra_sql => "WHERE payby IN('CARD','CHEK') ". - "AND (paybatch IS NOT NULL ". - "OR (paybatch IS NULL AND auth IS NULL - $and_batchnum_is_null ) )", - }) ) - { + my $search = FS::Cursor->new({ + table => $table, + extra_sql => "WHERE payby IN('CARD','CHEK') ". + "AND (paybatch IS NOT NULL ". + "OR (paybatch IS NULL AND auth IS NULL + $and_batchnum_is_null ) )", + }); + while ( my $object = $search->fetch ) { if ( $object->paybatch eq '' ) { # repair for a previous upgrade that didn't save 'auth' my $pkey = $object->primary_key; -- 2.11.0