paged search to conserve memory in CDR processing, #16723
authorMark Wells <mark@freeside.biz>
Sat, 19 May 2012 21:57:29 +0000 (14:57 -0700)
committerMark Wells <mark@freeside.biz>
Sat, 19 May 2012 21:57:29 +0000 (14:57 -0700)
FS/FS/ClientAPI/MyAccount.pm
FS/FS/PagedSearch.pm [new file with mode: 0644]
FS/FS/part_pkg/voip_cdr.pm
FS/FS/part_pkg/voip_inbound.pm
FS/FS/part_pkg/voip_tiered.pm
FS/FS/svc_pbx.pm
FS/FS/svc_phone.pm

index a07e345..e79fbfc 100644 (file)
@@ -1913,6 +1913,8 @@ sub list_support_usage {
 
 sub _list_cdr_usage {
   # XXX CDR type support...
+  # XXX any way to do a paged search on this?
+  # we have to return the results all at once...
   my($svc_phone, $begin, $end, %opt) = @_;
   map [ $_->downstream_csv(%opt, 'keeparray' => 1) ],
     $svc_phone->get_cdrs( 'begin'=>$begin, 'end'=>$end, );
diff --git a/FS/FS/PagedSearch.pm b/FS/FS/PagedSearch.pm
new file mode 100644 (file)
index 0000000..09d05c4
--- /dev/null
@@ -0,0 +1,189 @@
+package FS::PagedSearch;
+
+use strict;
+use vars qw($DEBUG $default_limit @EXPORT_OK);
+use base qw( Exporter );
+use FS::Record qw(qsearch dbdef);
+use Data::Dumper;
+
+$DEBUG = 0;
+$default_limit = 100;
+
+@EXPORT_OK = 'psearch';
+
+=head1 NAME
+
+FS::PagedSearch - Iterator for querying large data sets
+
+=head1 SYNOPSIS
+
+use FS::PagedSearch qw(psearch);
+
+my $search = psearch('table', { field => 'value' ... });
+$search->limit(100); #optional
+while ( my $row = $search->fetch ) {
+...
+}
+
+=head1 SUBROUTINES
+
+=over 4
+
+=item psearch ARGUMENTS
+
+A wrapper around L<FS::Record::qsearch>.  Accepts all the same arguments 
+as qsearch, except for the arrayref union query mode, and returns an 
+FS::PagedSearch object to access the rows of the query one at a time.  
+If the query doesn't contain an ORDER BY clause already, it will be ordered
+by the table's primary key.
+
+=cut
+
+sub psearch {
+  # deep-copy qsearch args
+  my $q;
+  if ( ref($_[0]) eq 'ARRAY' ) {
+    die "union query not supported with psearch"; #yet
+  }
+  elsif ( ref($_[0]) eq 'HASH' ) {
+    %$q = %{ $_[0] };
+  }
+  else {
+    $q = {
+      'table'     => shift,
+      'hashref'   => shift,
+      'select'    => shift,
+      'extra_sql' => shift,
+      'cache_obj' => shift,
+      'addl_from' => shift,
+    };
+  }
+  warn Dumper($q) if $DEBUG > 1;
+
+  # clean up query
+  my $dbdef = dbdef->table($q->{table});
+  # qsearch just appends order_by to extra_sql, so do that ourselves
+  $q->{extra_sql} ||= '';
+  $q->{extra_sql} .= ' '.$q->{order_by} if $q->{order_by};
+  $q->{order_by} = '';
+  # and impose an ordering if needed
+  if ( not $q->{extra_sql} =~ /order by/i ) {
+    $q->{extra_sql} .= ' ORDER BY '.$dbdef->primary_key;
+  }
+  # and then we'll use order_by for LIMIT/OFFSET
+
+  my $self = {
+    query     => $q,
+    buffer    => [],
+    offset    => 0,
+    limit     => $default_limit,
+    increment => 1,
+  };
+  bless $self, 'FS::PagedSearch';
+
+  $self;
+}
+
+=back
+
+=head1 METHODS
+
+=over 4
+
+=item fetch
+
+Fetch the next row from the search results and remove it from the buffer.
+Returns undef if there are no more rows.
+
+=cut
+
+sub fetch {
+  my $self = shift;
+  my $b = $self->{buffer};
+  $self->refill if @$b == 0;
+  $self->{offset} += $self->{increment} if @$b;
+  return shift @$b;
+}
+
+=item adjust ROWS
+
+Add ROWS to the offset counter.  This won't cause rows to be skipped in the
+current buffer but will affect the starting point of the next refill.
+
+=cut
+
+sub adjust {
+  my $self = shift;
+  my $r = shift;
+  $self->{offset} += $r;
+}
+
+=item limit [ VALUE ]
+
+Set/get the number of rows to retrieve per page.  The default is 100.
+
+=cut
+
+sub limit {
+  my $self = shift;
+  my $new_limit = shift;
+  if ( defined($new_limit) ) {
+    $self->{limit} = $new_limit;
+  }
+  $self->{limit};
+}
+
+=item increment [ VALUE ]
+
+Set/get the number of rows to increment the offset for each row that's
+retrieved.  Defaults to 1.  If the rows are being modified in a way that 
+removes them from the result set of the query, it's probably wise to set 
+this to zero.  Setting it to anything else is probably nonsense.
+
+=cut
+
+sub increment {
+  my $self = shift;
+  my $new_inc = shift;
+  if ( defined($new_inc) ) {
+    $self->{increment} = $new_inc;
+  }
+  $self->{increment};
+}
+
+
+=item refill
+
+Run the query, skipping a number of rows set by the row offset, and replace 
+the contents of the buffer with the result.  If there are no more rows, 
+this will just empty the buffer.  Called automatically as needed; don't call 
+this from outside.
+
+=cut
+
+sub refill {
+  my $self = shift;
+  my $b = $self->{buffer};
+  warn "refilling (limit ".$self->{limit}.", offset ".$self->{offset}.")\n"
+    if $DEBUG;
+  warn "discarding ".scalar(@$b)." rows\n" if $DEBUG and @$b;
+  if ( $self->{limit} > 0 ) {
+    $self->{query}->{order_by} = 'LIMIT ' . $self->{limit} . 
+                                 ' OFFSET ' . $self->{offset};
+  }
+  @$b = qsearch( $self->{query} );
+  my $rows = scalar @$b;
+  warn "$rows returned\n" if $DEBUG;
+
+  $rows;
+}
+
+=back
+
+=head1 SEE ALSO
+
+L<FS::Record>
+
+=cut
+
+1;
index aaad974..8c3d80d 100644 (file)
@@ -401,9 +401,10 @@ sub calc_usage {
     #my @invoice_details_sort;
 
     #first rate any outstanding CDRs not yet rated
-    foreach my $cdr (
-      $svc_x->get_cdrs( %options )
-    ) {
+    my $cdr_search = $svc_x->psearch_cdrs(%options);
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0); # because we're changing their status as we go
+    while ( my $cdr = $cdr_search->fetch ) {
 
       my $error = $cdr->rate(
         'part_pkg'                          => $self,
@@ -414,14 +415,19 @@ sub calc_usage {
       );
       die $error if $error; #??
 
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+      # it was skipped without changing status, so increment the 
+      # offset so that we don't re-fetch it on refill
+
     } # $cdr
 
     #then add details to invoices & get a total
     $options{'status'} = 'rated';
 
-    foreach my $cdr (
-      $svc_x->get_cdrs( %options ) 
-    ) {
+    $cdr_search = $svc_x->psearch_cdrs(%options);
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0);
+    while ( my $cdr = $cdr_search->fetch ) {
       my $error;
       # at this point we officially Do Not Care about the rating method
       if ( $included_calls > 0 ) {
@@ -436,7 +442,9 @@ sub calc_usage {
       }
       die $error if $error;
       $formatter->append($cdr);
-    }
+
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq 'rated';
+    } #$cdr
   }
 
   $formatter->finish; #writes into $details
index f4e5183..ecc4f47 100644 (file)
@@ -227,13 +227,15 @@ sub calc_usage {
   ) {
     my $svc_phone = $cust_svc->svc_x;
 
-    foreach my $cdr ( $svc_phone->get_cdrs(
+    my $cdr_search = $svc_phone->psearch_cdrs(
       'inbound'        => 1,
       'default_prefix' => $self->option('default_prefix'),
       'status'         => '', # unprocessed only
       'for_update'     => 1,
-      )
-    ) {
+    );
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0);
+    while ( my $cdr = $cdr_search->fetch ) {
 
       my $reason = $self->check_chargable( $cdr,
                                            'option_cache' => \%opt_cache,
@@ -310,6 +312,8 @@ sub calc_usage {
       die $error if $error;
       $formatter->append($cdr);
 
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
     } #$cdr
   } # $cust_svc
 #  unshift @$details, { format => 'C',
index e5dcf6d..d8d74c1 100644 (file)
@@ -132,9 +132,11 @@ sub calc_usage {
 
       $options{'inbound'} = ( $pass eq 'inbound' );
 
-      foreach my $cdr (
-        $svc_x->get_cdrs( %options )
-      ) {
+      my $cdr_search = $svc_x->psearch_cdrs(%options);
+      $cdr_search->limit(1000);
+      $cdr_search->increment(0);
+      while ( my $cdr = $cdr_search->fetch ) {
+
         if ( $DEBUG > 1 ) {
           warn "rating CDR $cdr\n".
                join('', map { "  $_ => ". $cdr->{$_}. "\n" } keys %$cdr );
@@ -173,6 +175,8 @@ sub calc_usage {
 
         $total += $charge_min;
 
+        $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
       } # $cdr
 
     } # $pass
@@ -213,9 +217,10 @@ sub calc_usage {
       # tell the formatter what we're sending it
       $formatter->inbound($options{'inbound'});
 
-      foreach my $cdr (
-        $svc_x->get_cdrs( %options )
-      ) {
+      my $cdr_search = $svc_x->psearch_cdrs(%options);
+      $cdr_search->limit(1000);
+      $cdr_search->increment(0);
+      while ( my $cdr = $cdr_search->fetch ) {
 
         my $object = $options{'inbound'}
                        ? $cdr->cdr_termination( 1 ) #1: inbound
@@ -242,6 +247,8 @@ sub calc_usage {
 
         $formatter->append($cdr);
 
+        $cdr_search->adjust(1) if $cdr->freesidestatus eq 'processing-tiered';
+
       } # $cdr
 
     } # $pass
index f8b9605..4182a13 100644 (file)
@@ -3,6 +3,7 @@ package FS::svc_pbx;
 use strict;
 use base qw( FS::svc_External_Common );
 use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
 use FS::Conf;
 use FS::cust_svc;
 use FS::svc_phone;
@@ -259,11 +260,13 @@ sub _check_duplicate {
   return '';
 }
 
-=item get_cdrs
+=item psearch_cdrs OPTIONS
 
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this 
-service.  By default, "associated with" means that the "charged_party" field of
-the CDR matches the "title" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records 
+associated with this service.  By default, "associated with" means that 
+the "charged_party" field of the CDR matches the "title" field of the 
+service.  To access the CDRs themselves, call "->fetch" on the resulting
+object.
 
 =over 2
 
@@ -295,7 +298,7 @@ to allow title to indicate a range of IP addresses.
 
 =cut
 
-sub get_cdrs {
+sub psearch_cdrs {
   my($self, %options) = @_;
   my %hash = ();
   my @where = ();
@@ -343,15 +346,26 @@ sub get_cdrs {
   my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where )
     if @where;
 
-  my @cdrs =
-    qsearch( {
+  psearch( {
       'table'      => 'cdr',
       'hashref'    => \%hash,
       'extra_sql'  => $extra_sql,
       'order_by'   => "ORDER BY startdate $for_update",
-    } );
+  } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a 
+single list.  Arguments are the same as for psearch_cdrs.  This can take
+an unreasonably large amount of memory and is best avoided.
 
-  @cdrs;
+=cut
+
+sub get_cdrs {
+  my $self = shift;
+  my $psearch = $self->psearch_cdrs($_);
+  qsearch ( $psearch->{query} )
 }
 
 =back
index b395ea6..1296c1e 100644 (file)
@@ -7,6 +7,7 @@ use Data::Dumper;
 use Scalar::Util qw( blessed );
 use FS::Conf;
 use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
 use FS::Msgcat qw(gettext);
 use FS::part_svc;
 use FS::phone_device;
@@ -648,11 +649,13 @@ sub cust_location_or_main {
   $cust_pkg ? $cust_pkg->cust_location_or_main : '';
 }
 
-=item get_cdrs
+=item psearch_cdrs OPTIONS
 
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this 
-service.  By default, "associated with" means that either the "src" or the 
-"charged_party" field of the CDR matches the "phonenum" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records 
+associated with this service.  By default, "associated with" means that 
+either the "src" or the "charged_party" field of the CDR matches the 
+"phonenum" field of the service.  To access the CDRs themselves, call
+"->fetch" on the resulting object.
 
 =over 2
 
@@ -676,11 +679,16 @@ with the chosen prefix.
 
 =item by_svcnum: not supported for svc_phone
 
+=item billsec_sum: Instead of returning all of the CDRs, return a single
+record (as an L<FS::cdr> object) with the sum of the 'billsec' field over 
+the entire result set.
+
 =back
 
 =cut
 
-sub get_cdrs {
+sub psearch_cdrs {
+
   my($self, %options) = @_;
   my @fields;
   my %hash;
@@ -739,18 +747,30 @@ sub get_cdrs {
 
   my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where );
 
-  my @cdrs =
-    qsearch( {
+  psearch( {
       'table'      => 'cdr',
       'hashref'    => \%hash,
       'extra_sql'  => $extra_sql,
       'order_by'   => $options{'billsec_sum'} ? '' : "ORDER BY startdate $for_update",
       'select'     => $options{'billsec_sum'} ? 'sum(billsec) as billsec_sum' : '*',
-    } );
+  } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a 
+single list.  Arguments are the same as for psearch_cdrs.  This can take 
+an unreasonably large amount of memory and is best avoided.
 
-  @cdrs;
+=cut
+
+sub get_cdrs {
+  my $self = shift;
+  my $psearch = $self->psearch_cdrs(@_);
+  qsearch ( $psearch->{query} )
 }
 
+
 =back
 
 =head1 BUGS