add customer status to advanced invoice report, RT#29810
[freeside.git] / FS / FS / Cursor.pm
1 package FS::Cursor;
2
3 use strict;
4 use vars qw($DEBUG $buffer);
5 use FS::Record;
6 use FS::UID qw(myconnect driver_name);
7 use Scalar::Util qw(refaddr);
8
9 $DEBUG = 2;
10
11 # this might become a parameter at some point, but right now, you can
12 # "local $FS::Cursor::buffer = X;"
13 $buffer = 200;
14
15 =head1 NAME
16
17 FS::Cursor - Iterator for querying large data sets
18
19 =head1 SYNOPSIS
20
21 use FS::Cursor;
22
23 my $search = FS::Cursor->new('table', { field => 'value' ... });
24 while ( my $row = $search->fetch ) {
25 ...
26 }
27
28 =head1 CLASS METHODS
29
30 =over 4
31
32 =item new ARGUMENTS
33
34 Constructs a cursored search.  Accepts all the same arguments as qsearch,
35 and returns an FS::Cursor object to fetch the rows one at a time.
36
37 =cut
38
39 sub new {
40   my $class = shift;
41   my $q = FS::Record::_query(@_); # builds the statement and parameter list
42   my $dbh;
43
44   my $self = {
45     query => $q,
46     class => 'FS::' . ($q->{table} || 'Record'),
47     buffer => [],
48     position => 0, # for mysql
49   };
50   bless $self, $class;
51
52   # the class of record object to return
53   $self->{class} = "FS::".($q->{table} || 'Record');
54
55   # save for later, so forked children will not destroy me when they exit
56   $self->{pid} = $$;
57
58   $self->{id} = sprintf('cursor%08x', refaddr($self));
59
60   my $statement;
61   if ( driver_name() eq 'Pg' ) {
62     $self->{dbh} = $dbh = myconnect();
63     $statement = "DECLARE ".$self->{id}." CURSOR FOR ".$q->{statement};
64   } elsif ( driver_name() eq 'mysql' ) {
65     # build a cursor from scratch
66     #
67     #
68     # there are problems doing it this way, and we don't have time to resolve
69     # them all right now...
70     #$statement = "CREATE TEMPORARY TABLE $self->{id} 
71     #  (rownum INT AUTO_INCREMENT, PRIMARY KEY (rownum))
72     #  $q->{statement}";
73
74     # one of those problems is locking, so keep everything on the main session
75     $self->{dbh} = $dbh = FS::UID::dbh();
76     $statement = $q->{statement};
77   }
78
79   my $sth = $dbh->prepare($statement)
80     or die $dbh->errstr;
81   my $bind = 1;
82   foreach my $value ( @{ $q->{value} } ) {
83     my $bind_type = shift @{ $q->{bind_type} };
84     $sth->bind_param($bind++, $value, $bind_type );
85   }
86
87   $sth->execute or die $sth->errstr;
88
89   if ( driver_name() eq 'Pg' ) {
90     $self->{fetch} = $dbh->prepare("FETCH FORWARD $buffer FROM ".$self->{id});
91   } elsif ( driver_name() eq 'mysql' ) {
92     # make sure we're not holding any locks on the tables mentioned
93     # in the query
94     #$dbh->commit if driver_name() eq 'mysql';
95     #$self->{fetch} = $dbh->prepare("SELECT * FROM $self->{id} ORDER BY rownum LIMIT ?, $buffer");
96
97     # instead, fetch all the rows at once
98     $self->{buffer} = $sth->fetchall_arrayref( {} );
99   }
100
101   $self;
102 }
103
104 =back
105
106 =head1 METHODS
107
108 =over 4
109
110 =item fetch
111
112 Fetch the next row from the search results.
113
114 =cut
115
116 sub fetch {
117   # might be a little more efficient to do a FETCH NEXT 1000 or something
118   # and buffer them locally, but the semantics are simpler this way
119   my $self = shift;
120   if (@{ $self->{buffer} } == 0) {
121     my $rows = $self->refill;
122     return undef if !$rows;
123   }
124   $self->{class}->new(shift @{ $self->{buffer} });
125 }
126
127 sub refill {
128   my $self = shift;
129   if (driver_name() eq 'Pg') {
130     my $sth = $self->{fetch};
131     $sth->bind_param(1, $self->{position}) if driver_name() eq 'mysql';
132     $sth->execute or die $sth->errstr;
133     my $result = $self->{fetch}->fetchall_arrayref( {} );
134     $self->{buffer} = $result;
135     $self->{position} += $sth->rows;
136     scalar @$result;
137   } # mysql can't be refilled, since everything is buffered from the start
138 }
139
140 sub DESTROY {
141   my $self = shift;
142   return if driver_name() eq 'mysql';
143
144   return unless $self->{pid} eq $$;
145   $self->{dbh}->do('CLOSE '. $self->{id})
146     or die $self->{dbh}->errstr; # clean-up the cursor in Pg
147   $self->{dbh}->rollback;
148   $self->{dbh}->disconnect;
149 }
150
151 =back
152
153 =head1 TO DO
154
155 Replace all uses of qsearch with this.
156
157 =head1 BUGS
158
159 Still doesn't really support MySQL, but it pretends it does, by simply
160 running the query and returning records one at a time.
161
162 The cursor will close prematurely if any code issues a rollback/commit. If
163 you need protection against this use qsearch or fork and get a new dbh
164 handle.
165 Normally this issue will represent itself this message.
166 ERROR: cursor "cursorXXXXXXX" does not exist.
167
168 =head1 SEE ALSO
169
170 L<FS::Record>
171
172 =cut
173
174 1;