summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Wells <mark@freeside.biz>2012-05-19 14:57:29 -0700
committerMark Wells <mark@freeside.biz>2012-05-19 14:57:29 -0700
commit2b740ea3cbb32a3fb3906546552503d9d3cca590 (patch)
treeabbb98e7671971b947c30929ab4a00e742ef4e66
parentbec96a5b94e6c2484a48ed2d4300a1294fa80de6 (diff)
paged search to conserve memory in CDR processing, #16723
-rw-r--r--FS/FS/ClientAPI/MyAccount.pm2
-rw-r--r--FS/FS/PagedSearch.pm189
-rw-r--r--FS/FS/part_pkg/voip_cdr.pm22
-rw-r--r--FS/FS/part_pkg/voip_inbound.pm10
-rw-r--r--FS/FS/part_pkg/voip_tiered.pm19
-rw-r--r--FS/FS/svc_pbx.pm32
-rw-r--r--FS/FS/svc_phone.pm38
7 files changed, 278 insertions, 34 deletions
diff --git a/FS/FS/ClientAPI/MyAccount.pm b/FS/FS/ClientAPI/MyAccount.pm
index a07e345..e79fbfc 100644
--- a/FS/FS/ClientAPI/MyAccount.pm
+++ b/FS/FS/ClientAPI/MyAccount.pm
@@ -1913,6 +1913,8 @@ sub list_support_usage {
sub _list_cdr_usage {
# XXX CDR type support...
+ # XXX any way to do a paged search on this?
+ # we have to return the results all at once...
my($svc_phone, $begin, $end, %opt) = @_;
map [ $_->downstream_csv(%opt, 'keeparray' => 1) ],
$svc_phone->get_cdrs( 'begin'=>$begin, 'end'=>$end, );
diff --git a/FS/FS/PagedSearch.pm b/FS/FS/PagedSearch.pm
new file mode 100644
index 0000000..09d05c4
--- /dev/null
+++ b/FS/FS/PagedSearch.pm
@@ -0,0 +1,189 @@
+package FS::PagedSearch;
+
+use strict;
+use vars qw($DEBUG $default_limit @EXPORT_OK);
+use base qw( Exporter );
+use FS::Record qw(qsearch dbdef);
+use Data::Dumper;
+
+$DEBUG = 0;
+$default_limit = 100;
+
+@EXPORT_OK = 'psearch';
+
+=head1 NAME
+
+FS::PagedSearch - Iterator for querying large data sets
+
+=head1 SYNOPSIS
+
+use FS::PagedSearch qw(psearch);
+
+my $search = psearch('table', { field => 'value' ... });
+$search->limit(100); #optional
+while ( my $row = $search->fetch ) {
+...
+}
+
+=head1 SUBROUTINES
+
+=over 4
+
+=item psearch ARGUMENTS
+
+A wrapper around L<FS::Record::qsearch>. Accepts all the same arguments
+as qsearch, except for the arrayref union query mode, and returns an
+FS::PagedSearch object to access the rows of the query one at a time.
+If the query doesn't contain an ORDER BY clause already, it will be ordered
+by the table's primary key.
+
+=cut
+
+sub psearch {
+ # deep-copy qsearch args
+ my $q;
+ if ( ref($_[0]) eq 'ARRAY' ) {
+ die "union query not supported with psearch"; #yet
+ }
+ elsif ( ref($_[0]) eq 'HASH' ) {
+ %$q = %{ $_[0] };
+ }
+ else {
+ $q = {
+ 'table' => shift,
+ 'hashref' => shift,
+ 'select' => shift,
+ 'extra_sql' => shift,
+ 'cache_obj' => shift,
+ 'addl_from' => shift,
+ };
+ }
+ warn Dumper($q) if $DEBUG > 1;
+
+ # clean up query
+ my $dbdef = dbdef->table($q->{table});
+ # qsearch just appends order_by to extra_sql, so do that ourselves
+ $q->{extra_sql} ||= '';
+ $q->{extra_sql} .= ' '.$q->{order_by} if $q->{order_by};
+ $q->{order_by} = '';
+ # and impose an ordering if needed
+ if ( not $q->{extra_sql} =~ /order by/i ) {
+ $q->{extra_sql} .= ' ORDER BY '.$dbdef->primary_key;
+ }
+ # and then we'll use order_by for LIMIT/OFFSET
+
+ my $self = {
+ query => $q,
+ buffer => [],
+ offset => 0,
+ limit => $default_limit,
+ increment => 1,
+ };
+ bless $self, 'FS::PagedSearch';
+
+ $self;
+}
+
+=back
+
+=head1 METHODS
+
+=over 4
+
+=item fetch
+
+Fetch the next row from the search results and remove it from the buffer.
+Returns undef if there are no more rows.
+
+=cut
+
+sub fetch {
+ my $self = shift;
+ my $b = $self->{buffer};
+ $self->refill if @$b == 0;
+ $self->{offset} += $self->{increment} if @$b;
+ return shift @$b;
+}
+
+=item adjust ROWS
+
+Add ROWS to the offset counter. This won't cause rows to be skipped in the
+current buffer but will affect the starting point of the next refill.
+
+=cut
+
+sub adjust {
+ my $self = shift;
+ my $r = shift;
+ $self->{offset} += $r;
+}
+
+=item limit [ VALUE ]
+
+Set/get the number of rows to retrieve per page. The default is 100.
+
+=cut
+
+sub limit {
+ my $self = shift;
+ my $new_limit = shift;
+ if ( defined($new_limit) ) {
+ $self->{limit} = $new_limit;
+ }
+ $self->{limit};
+}
+
+=item increment [ VALUE ]
+
+Set/get the number of rows to increment the offset for each row that's
+retrieved. Defaults to 1. If the rows are being modified in a way that
+removes them from the result set of the query, it's probably wise to set
+this to zero. Setting it to anything else is probably nonsense.
+
+=cut
+
+sub increment {
+ my $self = shift;
+ my $new_inc = shift;
+ if ( defined($new_inc) ) {
+ $self->{increment} = $new_inc;
+ }
+ $self->{increment};
+}
+
+
+=item refill
+
+Run the query, skipping a number of rows set by the row offset, and replace
+the contents of the buffer with the result. If there are no more rows,
+this will just empty the buffer. Called automatically as needed; don't call
+this from outside.
+
+=cut
+
+sub refill {
+ my $self = shift;
+ my $b = $self->{buffer};
+ warn "refilling (limit ".$self->{limit}.", offset ".$self->{offset}.")\n"
+ if $DEBUG;
+ warn "discarding ".scalar(@$b)." rows\n" if $DEBUG and @$b;
+ if ( $self->{limit} > 0 ) {
+ $self->{query}->{order_by} = 'LIMIT ' . $self->{limit} .
+ ' OFFSET ' . $self->{offset};
+ }
+ @$b = qsearch( $self->{query} );
+ my $rows = scalar @$b;
+ warn "$rows returned\n" if $DEBUG;
+
+ $rows;
+}
+
+=back
+
+=head1 SEE ALSO
+
+L<FS::Record>
+
+=cut
+
+1;
diff --git a/FS/FS/part_pkg/voip_cdr.pm b/FS/FS/part_pkg/voip_cdr.pm
index aaad974..8c3d80d 100644
--- a/FS/FS/part_pkg/voip_cdr.pm
+++ b/FS/FS/part_pkg/voip_cdr.pm
@@ -401,9 +401,10 @@ sub calc_usage {
#my @invoice_details_sort;
#first rate any outstanding CDRs not yet rated
- foreach my $cdr (
- $svc_x->get_cdrs( %options )
- ) {
+ my $cdr_search = $svc_x->psearch_cdrs(%options);
+ $cdr_search->limit(1000);
+ $cdr_search->increment(0); # because we're changing their status as we go
+ while ( my $cdr = $cdr_search->fetch ) {
my $error = $cdr->rate(
'part_pkg' => $self,
@@ -414,14 +415,19 @@ sub calc_usage {
);
die $error if $error; #??
+ $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+ # it was skipped without changing status, so increment the
+ # offset so that we don't re-fetch it on refill
+
} # $cdr
#then add details to invoices & get a total
$options{'status'} = 'rated';
- foreach my $cdr (
- $svc_x->get_cdrs( %options )
- ) {
+ $cdr_search = $svc_x->psearch_cdrs(%options);
+ $cdr_search->limit(1000);
+ $cdr_search->increment(0);
+ while ( my $cdr = $cdr_search->fetch ) {
my $error;
# at this point we officially Do Not Care about the rating method
if ( $included_calls > 0 ) {
@@ -436,7 +442,9 @@ sub calc_usage {
}
die $error if $error;
$formatter->append($cdr);
- }
+
+ $cdr_search->adjust(1) if $cdr->freesidestatus eq 'rated';
+ } #$cdr
}
$formatter->finish; #writes into $details
diff --git a/FS/FS/part_pkg/voip_inbound.pm b/FS/FS/part_pkg/voip_inbound.pm
index f4e5183..ecc4f47 100644
--- a/FS/FS/part_pkg/voip_inbound.pm
+++ b/FS/FS/part_pkg/voip_inbound.pm
@@ -227,13 +227,15 @@ sub calc_usage {
) {
my $svc_phone = $cust_svc->svc_x;
- foreach my $cdr ( $svc_phone->get_cdrs(
+ my $cdr_search = $svc_phone->psearch_cdrs(
'inbound' => 1,
'default_prefix' => $self->option('default_prefix'),
'status' => '', # unprocessed only
'for_update' => 1,
- )
- ) {
+ );
+ $cdr_search->limit(1000);
+ $cdr_search->increment(0);
+ while ( my $cdr = $cdr_search->fetch ) {
my $reason = $self->check_chargable( $cdr,
'option_cache' => \%opt_cache,
@@ -310,6 +312,8 @@ sub calc_usage {
die $error if $error;
$formatter->append($cdr);
+ $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
} #$cdr
} # $cust_svc
# unshift @$details, { format => 'C',
diff --git a/FS/FS/part_pkg/voip_tiered.pm b/FS/FS/part_pkg/voip_tiered.pm
index e5dcf6d..d8d74c1 100644
--- a/FS/FS/part_pkg/voip_tiered.pm
+++ b/FS/FS/part_pkg/voip_tiered.pm
@@ -132,9 +132,11 @@ sub calc_usage {
$options{'inbound'} = ( $pass eq 'inbound' );
- foreach my $cdr (
- $svc_x->get_cdrs( %options )
- ) {
+ my $cdr_search = $svc_x->psearch_cdrs(%options);
+ $cdr_search->limit(1000);
+ $cdr_search->increment(0);
+ while ( my $cdr = $cdr_search->fetch ) {
+
if ( $DEBUG > 1 ) {
warn "rating CDR $cdr\n".
join('', map { " $_ => ". $cdr->{$_}. "\n" } keys %$cdr );
@@ -173,6 +175,8 @@ sub calc_usage {
$total += $charge_min;
+ $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
} # $cdr
} # $pass
@@ -213,9 +217,10 @@ sub calc_usage {
# tell the formatter what we're sending it
$formatter->inbound($options{'inbound'});
- foreach my $cdr (
- $svc_x->get_cdrs( %options )
- ) {
+ my $cdr_search = $svc_x->psearch_cdrs(%options);
+ $cdr_search->limit(1000);
+ $cdr_search->increment(0);
+ while ( my $cdr = $cdr_search->fetch ) {
my $object = $options{'inbound'}
? $cdr->cdr_termination( 1 ) #1: inbound
@@ -242,6 +247,8 @@ sub calc_usage {
$formatter->append($cdr);
+ $cdr_search->adjust(1) if $cdr->freesidestatus eq 'processing-tiered';
+
} # $cdr
} # $pass
diff --git a/FS/FS/svc_pbx.pm b/FS/FS/svc_pbx.pm
index f8b9605..4182a13 100644
--- a/FS/FS/svc_pbx.pm
+++ b/FS/FS/svc_pbx.pm
@@ -3,6 +3,7 @@ package FS::svc_pbx;
use strict;
use base qw( FS::svc_External_Common );
use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
use FS::Conf;
use FS::cust_svc;
use FS::svc_phone;
@@ -259,11 +260,13 @@ sub _check_duplicate {
return '';
}
-=item get_cdrs
+=item psearch_cdrs OPTIONS
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this
-service. By default, "associated with" means that the "charged_party" field of
-the CDR matches the "title" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records
+associated with this service. By default, "associated with" means that
+the "charged_party" field of the CDR matches the "title" field of the
+service. To access the CDRs themselves, call "->fetch" on the resulting
+object.
=over 2
@@ -295,7 +298,7 @@ to allow title to indicate a range of IP addresses.
=cut
-sub get_cdrs {
+sub psearch_cdrs {
my($self, %options) = @_;
my %hash = ();
my @where = ();
@@ -343,15 +346,26 @@ sub get_cdrs {
my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where )
if @where;
- my @cdrs =
- qsearch( {
+ psearch( {
'table' => 'cdr',
'hashref' => \%hash,
'extra_sql' => $extra_sql,
'order_by' => "ORDER BY startdate $for_update",
- } );
+ } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a
+single list. Arguments are the same as for psearch_cdrs. This can take
+an unreasonably large amount of memory and is best avoided.
- @cdrs;
+=cut
+
+sub get_cdrs {
+ my $self = shift;
+ my $psearch = $self->psearch_cdrs($_);
+ qsearch ( $psearch->{query} )
}
=back
diff --git a/FS/FS/svc_phone.pm b/FS/FS/svc_phone.pm
index b395ea6..1296c1e 100644
--- a/FS/FS/svc_phone.pm
+++ b/FS/FS/svc_phone.pm
@@ -7,6 +7,7 @@ use Data::Dumper;
use Scalar::Util qw( blessed );
use FS::Conf;
use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
use FS::Msgcat qw(gettext);
use FS::part_svc;
use FS::phone_device;
@@ -648,11 +649,13 @@ sub cust_location_or_main {
$cust_pkg ? $cust_pkg->cust_location_or_main : '';
}
-=item get_cdrs
+=item psearch_cdrs OPTIONS
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this
-service. By default, "associated with" means that either the "src" or the
-"charged_party" field of the CDR matches the "phonenum" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records
+associated with this service. By default, "associated with" means that
+either the "src" or the "charged_party" field of the CDR matches the
+"phonenum" field of the service. To access the CDRs themselves, call
+"->fetch" on the resulting object.
=over 2
@@ -676,11 +679,16 @@ with the chosen prefix.
=item by_svcnum: not supported for svc_phone
+=item billsec_sum: Instead of returning all of the CDRs, return a single
+record (as an L<FS::cdr> object) with the sum of the 'billsec' field over
+the entire result set.
+
=back
=cut
-sub get_cdrs {
+sub psearch_cdrs {
+
my($self, %options) = @_;
my @fields;
my %hash;
@@ -739,18 +747,30 @@ sub get_cdrs {
my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where );
- my @cdrs =
- qsearch( {
+ psearch( {
'table' => 'cdr',
'hashref' => \%hash,
'extra_sql' => $extra_sql,
'order_by' => $options{'billsec_sum'} ? '' : "ORDER BY startdate $for_update",
'select' => $options{'billsec_sum'} ? 'sum(billsec) as billsec_sum' : '*',
- } );
+ } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a
+single list. Arguments are the same as for psearch_cdrs. This can take
+an unreasonably large amount of memory and is best avoided.
- @cdrs;
+=cut
+
+sub get_cdrs {
+ my $self = shift;
+ my $psearch = $self->psearch_cdrs(@_);
+ qsearch ( $psearch->{query} )
}
+
=back
=head1 BUGS