import torrus 1.0.9
[freeside.git] / torrus / perllib / Torrus / Collector / ExternalStorage.pm
1 #  Copyright (C) 2005  Stanislav Sinyagin
2 #
3 #  This program is free software; you can redistribute it and/or modify
4 #  it under the terms of the GNU General Public License as published by
5 #  the Free Software Foundation; either version 2 of the License, or
6 #  (at your option) any later version.
7 #
8 #  This program is distributed in the hope that it will be useful,
9 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 #  GNU General Public License for more details.
12 #
13 #  You should have received a copy of the GNU General Public License
14 #  along with this program; if not, write to the Free Software
15 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
16
17 # $Id: ExternalStorage.pm,v 1.1 2010-12-27 00:03:57 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
19
20 package Torrus::Collector::ExternalStorage;
21
22 use Torrus::ConfigTree;
23 use Torrus::Log;
24
25 use strict;
26 use Math::BigInt;
27 use Math::BigFloat;
28
29 # Pluggable backend module implements all storage-specific tasks
30 BEGIN
31 {
32     eval( 'require ' . $Torrus::Collector::ExternalStorage::backend );
33     die( $@ ) if $@;    
34 }
35
36 # These variables must be set by the backend module
37 our $backendInit;
38 our $backendOpenSession;
39 our $backendStoreData;
40 our $backendCloseSession;
41
42 # Register the storage type
43 $Torrus::Collector::storageTypes{'ext'} = 1;
44
45
46 # List of needed parameters and default values
47
48 $Torrus::Collector::params{'ext-storage'} = {
49     'ext-dstype' => {
50         'GAUGE' => undef,
51         'COUNTER32' => {
52             'ext-counter-max' => undef},
53         'COUNTER64' => {
54             'ext-counter-max' => undef}},
55     'ext-service-id' => undef
56     };
57
58
59
60
61 $Torrus::Collector::initTarget{'ext-storage'} =
62     \&Torrus::Collector::ExternalStorage::initTarget;
63
64 sub initTarget
65 {
66     my $collector = shift;
67     my $token = shift;
68
69     my $sref = $collector->storageData( 'ext' );
70
71     $collector->registerDeleteCallback
72         ( $token, \&Torrus::Collector::ExternalStorage::deleteTarget );
73
74     my $serviceid =
75         $collector->param($token, 'ext-service-id');
76
77     if( defined( $sref->{'serviceid'}{$serviceid} ) )
78     {
79         Error('ext-service-id is not unique: "' . $serviceid .
80               '". External storage is not activated for ' .
81               $collector->path($token));
82         return;
83     }
84
85     $sref->{'serviceid'}{$serviceid} = 1;
86
87     my $processor;
88     my $dstype = $collector->param($token, 'ext-dstype');
89     if( $dstype eq 'GAUGE' )
90     {
91         $processor = \&Torrus::Collector::ExternalStorage::processGauge;
92     }
93     else
94     {
95         if( $dstype eq 'COUNTER32' )
96         {
97             $processor =
98                 \&Torrus::Collector::ExternalStorage::processCounter32;
99         }
100         else
101         {
102             $processor =
103                 \&Torrus::Collector::ExternalStorage::processCounter64;
104         }
105         
106         my $max = $collector->param( $token, 'ext-counter-max' );
107         if( defined( $max ) )
108         {
109             $sref->{'max'}{$token} = Math::BigFloat->new($max);
110         }
111     }
112
113     $sref->{'tokens'}{$token} = $processor;
114
115     &{$backendInit}( $collector, $token );
116 }
117
118
119
120 $Torrus::Collector::setValue{'ext'} =
121     \&Torrus::Collector::ExternalStorage::setValue;
122
123
124 sub setValue
125 {
126     my $collector = shift;
127     my $token = shift;
128     my $value = shift;
129     my $timestamp = shift;
130
131     my $sref = $collector->storageData( 'ext' );
132
133     my $prevTimestamp = $sref->{'prevTimestamp'}{$token};
134     if( not defined( $prevTimestamp ) )
135     {
136         $prevTimestamp = $timestamp;
137     }
138         
139     my $procvalue =
140         &{$sref->{'tokens'}{$token}}( $collector, $token, $value, $timestamp );
141     if( defined( $procvalue ) )
142     {
143         if( ref( $procvalue ) )
144         {
145             # Convert a BigFloat into a scientific notation string
146             $procvalue = $procvalue->bsstr();
147         }
148         $sref->{'values'}{$token} =
149             [$procvalue, $timestamp, $timestamp - $prevTimestamp];
150     }
151     
152     $sref->{'prevTimestamp'}{$token} = $timestamp;
153 }
154
155
156 sub processGauge
157 {
158     my $collector = shift;
159     my $token = shift;
160     my $value = shift;
161     my $timestamp = shift;
162
163     return $value;
164 }
165
166
167 sub processCounter32
168 {
169     my $collector = shift;
170     my $token = shift;
171     my $value = shift;
172     my $timestamp = shift;
173
174     return processCounter( 32, $collector, $token, $value, $timestamp );
175 }
176
177 sub processCounter64
178 {
179     my $collector = shift;
180     my $token = shift;
181     my $value = shift;
182     my $timestamp = shift;
183
184     return processCounter( 64, $collector, $token, $value, $timestamp );
185 }
186
187 my $base32 = Math::BigInt->new(2)->bpow(32);
188 my $base64 = Math::BigInt->new(2)->bpow(64);
189
190 sub processCounter
191 {
192     my $base = shift;
193     my $collector = shift;
194     my $token = shift;
195     my $value = shift;
196     my $timestamp = shift;
197
198     my $sref = $collector->storageData( 'ext' );
199
200     if( isDebug() )
201     {
202         Debug('ExternalStorage::processCounter: token=' . $token .
203               ' value=' . $value . ' timestamp=' . $timestamp);
204     }
205
206     if( $value eq 'U' )
207     {
208         # the agent rebooted, so we flush the counter
209         delete $sref->{'prevCounter'}{$token};
210         return undef;
211     }
212         
213     $value = Math::BigInt->new( $value );
214     my $ret;
215     
216     if( exists( $sref->{'prevCounter'}{$token} ) )
217     {
218         my $prevValue = $sref->{'prevCounter'}{$token};
219         my $prevTimestamp = $sref->{'prevTimestamp'}{$token};
220         if( isDebug() )
221         {
222             Debug('ExternalStorage::processCounter: prevValue=' . $prevValue .
223                   ' prevTimestamp=' . $prevTimestamp);
224         }
225         
226         if( $prevValue->bcmp( $value ) > 0 ) # previous is bigger
227         {
228             $ret = Math::BigFloat->new($base==32 ? $base32:$base64);
229             $ret->bsub( $prevValue );
230             $ret->badd( $value );
231         }
232         else
233         {
234             $ret = Math::BigFloat->new( $value );
235             $ret->bsub( $prevValue );
236         }
237         $ret->bdiv( $timestamp - $prevTimestamp );
238         if( defined( $sref->{'max'}{$token} ) )
239         {
240             if( $ret->bcmp( $sref->{'max'}{$token} ) > 0 )
241             {
242                 Debug('Resulting counter rate is above the maximum');
243                 $ret = undef;
244             }
245         }
246     }
247
248     $sref->{'prevCounter'}{$token} = $value;
249
250     if( defined( $ret ) and isDebug() )
251     {
252         Debug('ExternalStorage::processCounter: Resulting value=' . $ret);
253     }
254     return $ret;
255 }
256
257
258
259 $Torrus::Collector::storeData{'ext'} =
260     \&Torrus::Collector::ExternalStorage::storeData;
261
262 # timestamp of last unavailable storage
263 my $storageUnavailable = 0;
264
265 # Last time we tried to reach it
266 my $storageLastTry = 0;
267
268 # how often we retry - configurable in torrus-config.pl
269 our $unavailableRetry;
270
271 # maximum age for backlog in case of unavailable storage.
272 # We stop recording new data when maxage is reached.
273 our $backlogMaxAge;
274
275 sub storeData
276 {
277     my $collector = shift;
278     my $sref = shift;
279
280     &Torrus::DB::checkInterrupted();
281
282     my $nTokens = scalar( keys %{$sref->{'values'}} );
283
284     if( $nTokens == 0 )
285     {
286         return;
287     }
288     
289     Verbose('Exporting data to external storage for ' .
290             $nTokens . ' tokens');
291     &{$backendOpenSession}();
292     
293     while( my($token, $valuetriple) = each( %{$sref->{'values'}} ) )
294     {
295         &Torrus::DB::checkInterrupted();
296         
297         my( $value, $timestamp, $interval ) = @{$valuetriple};
298         my $serviceid =
299             $collector->param($token, 'ext-service-id');
300         
301         my $toBacklog = 0;
302         
303         if( $storageUnavailable > 0 and 
304             time() < $storageLastTry + $unavailableRetry )
305         {
306             $toBacklog = 1;
307         }
308         else
309         {
310             $storageUnavailable = 0;
311             $storageLastTry = time();
312             
313             if( exists( $sref->{'backlog'} ) )
314             {
315                 # Try to flush the backlog first
316                 Verbose('Trying to flush the backlog');
317                     
318                 my $ok = 1;
319                 while( scalar(@{$sref->{'backlog'}}) > 0 and $ok )
320                 {
321                     my $quarter = shift @{$sref->{'backlog'}};
322                     if( not &{$backendStoreData}( @{$quarter} ) )
323                     {
324                         Warn('Unable to flush the backlog, external ' .
325                              'storage is unavailable');
326                         
327                         unshift( @{$sref->{'backlog'}}, $quarter );
328                         $ok = 0;
329                         $toBacklog = 1;
330                     }
331                 }
332                 if( $ok )
333                 {
334                     delete( $sref->{'backlog'} );
335                     Verbose('Backlog is successfully flushed');
336                 }                    
337             }
338             
339             if( not $toBacklog )
340             {
341                 if( not &{$backendStoreData}( $timestamp, $serviceid,
342                                               $value, $interval ) )
343                 {
344                     Warn('Unable to store data, external storage is ' .
345                          'unavailable. Saving data to backlog');
346                     
347                     $toBacklog = 1;                    
348                 }
349             }
350         }
351         
352         if( $toBacklog )
353         {
354             if( $storageUnavailable == 0 )
355             {
356                 $storageUnavailable = time();
357             }
358             
359             if( not exists( $sref->{'backlog'} ) )
360             {
361                 $sref->{'backlog'} = [];
362                 $sref->{'backlogStart'} = time();
363             }
364             
365             if( time() < $sref->{'backlogStart'} + $backlogMaxAge )
366             {
367                 push( @{$sref->{'backlog'}},
368                       [ $timestamp, $serviceid, $value, $interval ] );
369             }
370             else
371             {
372                 Error('Backlog has reached its maximum age, stopped storing ' .
373                       'any more data');
374             }
375         }
376     }    
377     
378     undef $sref->{'values'};
379     &{$backendCloseSession}();
380 }
381
382
383
384
385
386 # Callback executed by Collector
387
388 sub deleteTarget
389 {
390     my $collector = shift;
391     my $token = shift;
392
393     my $sref = $collector->storageData( 'ext' );
394
395     my $serviceid =
396         $collector->param($token, 'ext-service-id');
397     delete $sref->{'serviceid'}{$serviceid};
398
399     if( defined( $sref->{'prevCounter'}{$token} ) )
400     {
401         delete $sref->{'prevCounter'}{$token};
402     }
403     
404     delete $sref->{'tokens'}{$token};
405 }
406
407
408 1;
409
410
411 # Local Variables:
412 # mode: perl
413 # indent-tabs-mode: nil
414 # perl-indent-level: 4
415 # End: