1 # Copyright (C) 2005 Stanislav Sinyagin
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
17 # $Id: ExternalStorage.pm,v 1.1 2010-12-27 00:03:57 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
20 package Torrus::Collector::ExternalStorage;
22 use Torrus::ConfigTree;
29 # Pluggable backend module implements all storage-specific tasks
32 eval( 'require ' . $Torrus::Collector::ExternalStorage::backend );
36 # These variables must be set by the backend module
38 our $backendOpenSession;
39 our $backendStoreData;
40 our $backendCloseSession;
42 # Register the storage type
43 $Torrus::Collector::storageTypes{'ext'} = 1;
46 # List of needed parameters and default values
48 $Torrus::Collector::params{'ext-storage'} = {
52 'ext-counter-max' => undef},
54 'ext-counter-max' => undef}},
55 'ext-service-id' => undef
61 $Torrus::Collector::initTarget{'ext-storage'} =
62 \&Torrus::Collector::ExternalStorage::initTarget;
66 my $collector = shift;
69 my $sref = $collector->storageData( 'ext' );
71 $collector->registerDeleteCallback
72 ( $token, \&Torrus::Collector::ExternalStorage::deleteTarget );
75 $collector->param($token, 'ext-service-id');
77 if( defined( $sref->{'serviceid'}{$serviceid} ) )
79 Error('ext-service-id is not unique: "' . $serviceid .
80 '". External storage is not activated for ' .
81 $collector->path($token));
85 $sref->{'serviceid'}{$serviceid} = 1;
88 my $dstype = $collector->param($token, 'ext-dstype');
89 if( $dstype eq 'GAUGE' )
91 $processor = \&Torrus::Collector::ExternalStorage::processGauge;
95 if( $dstype eq 'COUNTER32' )
98 \&Torrus::Collector::ExternalStorage::processCounter32;
103 \&Torrus::Collector::ExternalStorage::processCounter64;
106 my $max = $collector->param( $token, 'ext-counter-max' );
107 if( defined( $max ) )
109 $sref->{'max'}{$token} = Math::BigFloat->new($max);
113 $sref->{'tokens'}{$token} = $processor;
115 &{$backendInit}( $collector, $token );
120 $Torrus::Collector::setValue{'ext'} =
121 \&Torrus::Collector::ExternalStorage::setValue;
126 my $collector = shift;
129 my $timestamp = shift;
131 my $sref = $collector->storageData( 'ext' );
133 my $prevTimestamp = $sref->{'prevTimestamp'}{$token};
134 if( not defined( $prevTimestamp ) )
136 $prevTimestamp = $timestamp;
140 &{$sref->{'tokens'}{$token}}( $collector, $token, $value, $timestamp );
141 if( defined( $procvalue ) )
143 if( ref( $procvalue ) )
145 # Convert a BigFloat into a scientific notation string
146 $procvalue = $procvalue->bsstr();
148 $sref->{'values'}{$token} =
149 [$procvalue, $timestamp, $timestamp - $prevTimestamp];
152 $sref->{'prevTimestamp'}{$token} = $timestamp;
158 my $collector = shift;
161 my $timestamp = shift;
169 my $collector = shift;
172 my $timestamp = shift;
174 return processCounter( 32, $collector, $token, $value, $timestamp );
179 my $collector = shift;
182 my $timestamp = shift;
184 return processCounter( 64, $collector, $token, $value, $timestamp );
187 my $base32 = Math::BigInt->new(2)->bpow(32);
188 my $base64 = Math::BigInt->new(2)->bpow(64);
193 my $collector = shift;
196 my $timestamp = shift;
198 my $sref = $collector->storageData( 'ext' );
202 Debug('ExternalStorage::processCounter: token=' . $token .
203 ' value=' . $value . ' timestamp=' . $timestamp);
208 # the agent rebooted, so we flush the counter
209 delete $sref->{'prevCounter'}{$token};
213 $value = Math::BigInt->new( $value );
216 if( exists( $sref->{'prevCounter'}{$token} ) )
218 my $prevValue = $sref->{'prevCounter'}{$token};
219 my $prevTimestamp = $sref->{'prevTimestamp'}{$token};
222 Debug('ExternalStorage::processCounter: prevValue=' . $prevValue .
223 ' prevTimestamp=' . $prevTimestamp);
226 if( $prevValue->bcmp( $value ) > 0 ) # previous is bigger
228 $ret = Math::BigFloat->new($base==32 ? $base32:$base64);
229 $ret->bsub( $prevValue );
230 $ret->badd( $value );
234 $ret = Math::BigFloat->new( $value );
235 $ret->bsub( $prevValue );
237 $ret->bdiv( $timestamp - $prevTimestamp );
238 if( defined( $sref->{'max'}{$token} ) )
240 if( $ret->bcmp( $sref->{'max'}{$token} ) > 0 )
242 Debug('Resulting counter rate is above the maximum');
248 $sref->{'prevCounter'}{$token} = $value;
250 if( defined( $ret ) and isDebug() )
252 Debug('ExternalStorage::processCounter: Resulting value=' . $ret);
259 $Torrus::Collector::storeData{'ext'} =
260 \&Torrus::Collector::ExternalStorage::storeData;
262 # timestamp of last unavailable storage
263 my $storageUnavailable = 0;
265 # Last time we tried to reach it
266 my $storageLastTry = 0;
268 # how often we retry - configurable in torrus-config.pl
269 our $unavailableRetry;
271 # maximum age for backlog in case of unavailable storage.
272 # We stop recording new data when maxage is reached.
277 my $collector = shift;
280 &Torrus::DB::checkInterrupted();
282 my $nTokens = scalar( keys %{$sref->{'values'}} );
289 Verbose('Exporting data to external storage for ' .
290 $nTokens . ' tokens');
291 &{$backendOpenSession}();
293 while( my($token, $valuetriple) = each( %{$sref->{'values'}} ) )
295 &Torrus::DB::checkInterrupted();
297 my( $value, $timestamp, $interval ) = @{$valuetriple};
299 $collector->param($token, 'ext-service-id');
303 if( $storageUnavailable > 0 and
304 time() < $storageLastTry + $unavailableRetry )
310 $storageUnavailable = 0;
311 $storageLastTry = time();
313 if( exists( $sref->{'backlog'} ) )
315 # Try to flush the backlog first
316 Verbose('Trying to flush the backlog');
319 while( scalar(@{$sref->{'backlog'}}) > 0 and $ok )
321 my $quarter = shift @{$sref->{'backlog'}};
322 if( not &{$backendStoreData}( @{$quarter} ) )
324 Warn('Unable to flush the backlog, external ' .
325 'storage is unavailable');
327 unshift( @{$sref->{'backlog'}}, $quarter );
334 delete( $sref->{'backlog'} );
335 Verbose('Backlog is successfully flushed');
341 if( not &{$backendStoreData}( $timestamp, $serviceid,
342 $value, $interval ) )
344 Warn('Unable to store data, external storage is ' .
345 'unavailable. Saving data to backlog');
354 if( $storageUnavailable == 0 )
356 $storageUnavailable = time();
359 if( not exists( $sref->{'backlog'} ) )
361 $sref->{'backlog'} = [];
362 $sref->{'backlogStart'} = time();
365 if( time() < $sref->{'backlogStart'} + $backlogMaxAge )
367 push( @{$sref->{'backlog'}},
368 [ $timestamp, $serviceid, $value, $interval ] );
372 Error('Backlog has reached its maximum age, stopped storing ' .
378 undef $sref->{'values'};
379 &{$backendCloseSession}();
386 # Callback executed by Collector
390 my $collector = shift;
393 my $sref = $collector->storageData( 'ext' );
396 $collector->param($token, 'ext-service-id');
397 delete $sref->{'serviceid'}{$serviceid};
399 if( defined( $sref->{'prevCounter'}{$token} ) )
401 delete $sref->{'prevCounter'}{$token};
404 delete $sref->{'tokens'}{$token};
413 # indent-tabs-mode: nil
414 # perl-indent-level: 4