1 # Copyright (C) 2002-2007 Stanislav Sinyagin
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
17 # $Id: RRDStorage.pm,v 1.1 2010-12-27 00:03:58 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
20 package Torrus::Collector::RRDStorage;
22 use Torrus::ConfigTree;
29 our $threadsInUse = 0;
33 # RRDtool is not reentrant. use this semaphore for every call to RRDs::*
34 our $rrdtoolSemaphore;
40 # Register the storage type
41 $Torrus::Collector::storageTypes{'rrd'} = 1;
44 # List of needed parameters and default values
46 $Torrus::Collector::params{'rrd-storage'} = {
49 'rrd-create-rra' => undef,
50 'rrd-create-heartbeat' => undef,
51 'rrd-create-min' => 'U',
52 'rrd-create-max' => 'U',
55 'rrd-create-hw-alpha' => 0.1,
56 'rrd-create-hw-beta' => 0.0035,
57 'rrd-create-hw-gamma' => 0.1,
58 'rrd-create-hw-winlen' => 9,
59 'rrd-create-hw-failth' => 6,
60 'rrd-create-hw-season' => 288,
61 'rrd-create-hw-rralen' => undef },
62 'disabled' => undef },
63 'rrd-create-dstype' => undef,
68 $Torrus::Collector::initThreadsHandlers{'rrd-storage'} =
69 \&Torrus::Collector::RRDStorage::initThreads;
73 if( $useThreads and not defined( $thrUpdateThread ) )
75 Verbose('RRD storage is configured for multithreading. Initializing ' .
76 'the background thread');
78 require threads::shared;
79 require Thread::Queue;
80 require Thread::Semaphore;
82 $thrUpdateQueue = new Thread::Queue;
83 $thrErrorsQueue = new Thread::Queue;
84 $rrdtoolSemaphore = new Thread::Semaphore;
86 $thrUpdateThread = threads->create( \&rrdUpdateThread );
87 $thrUpdateThread->detach();
94 $Torrus::Collector::initTarget{'rrd-storage'} =
95 \&Torrus::Collector::RRDStorage::initTarget;
99 my $collector = shift;
102 my $sref = $collector->storageData( 'rrd' );
104 $collector->registerDeleteCallback
105 ( $token, \&Torrus::Collector::RRDStorage::deleteTarget );
108 $collector->param($token, 'data-dir') . '/' .
109 $collector->param($token, 'data-file');
111 $sref->{'byfile'}{$filename}{$token} = 1;
112 $sref->{'filename'}{$token} = $filename;
117 $Torrus::Collector::setValue{'rrd'} =
118 \&Torrus::Collector::RRDStorage::setValue;
123 my $collector = shift;
126 my $timestamp = shift;
129 my $sref = $collector->storageData( 'rrd' );
131 $sref->{'values'}{$token} = [$value, $timestamp, $uptime];
135 $Torrus::Collector::storeData{'rrd'} =
136 \&Torrus::Collector::RRDStorage::storeData;
140 my $collector = shift;
145 $collector->setStatValue( 'RRDQueue', $thrUpdateQueue->pending() );
148 if( $threadsInUse and $thrUpdateQueue->pending() > $thrQueueLimit )
150 Error('Cannot enqueue RRD files for updating: ' .
151 'queue size is above limit');
155 while( my ($filename, $tokens) = each %{$sref->{'byfile'}} )
157 &Torrus::DB::checkInterrupted();
159 if( not -e $filename )
161 createRRD( $collector, $sref, $filename, $tokens );
166 updateRRD( $collector, $sref, $filename, $tokens );
171 delete $sref->{'values'};
179 $rrdtoolSemaphore->down();
187 $rrdtoolSemaphore->up();
194 my $collector = shift;
196 my $filename = shift;
199 # We use hashes here, in order to make the superset of RRA
200 # definitions, and unique RRD names
204 # Holt-Winters parameters
208 my $timestamp = time();
210 foreach my $token ( keys %{$tokens} )
213 sprintf('DS:%s:%s:%d:%s:%s',
214 $collector->param($token, 'rrd-ds'),
215 $collector->param($token, 'rrd-create-dstype'),
216 $collector->param($token, 'rrd-create-heartbeat'),
217 $collector->param($token, 'rrd-create-min'),
218 $collector->param($token, 'rrd-create-max'));
219 $DS_hash{$ds_string} = 1;
221 foreach my $rra_string
222 ( split(/\s+/, $collector->param($token, 'rrd-create-rra')) )
224 $RRA_hash{$rra_string} = 1;
227 if( $collector->param($token, 'rrd-hwpredict') eq 'enabled' )
231 foreach my $param ( 'alpha', 'beta', 'gamma', 'winlen', 'failth',
234 my $value = $collector->param($token, 'rrd-create-hw-'.$param);
236 if( defined( $hwparam{$param} ) and
237 $hwparam{$param} != $value )
239 my $paramname = 'rrd-create-hw-'.$param;
240 Warn("Parameter " . $paramname . " was already defined " .
241 "with differentr value for " . $filename);
244 $hwparam{$param} = $value;
248 if( ref $sref->{'values'}{$token} )
250 my $new_ts = $sref->{'values'}{$token}[1];
251 if( $new_ts > 0 and $new_ts < $timestamp )
253 $timestamp = $new_ts;
258 my @DS = sort keys %DS_hash;
259 my @RRA = sort keys %RRA_hash;
263 ## Define the RRAs for Holt-Winters prediction
265 my $hwpredict_rran = scalar(@RRA) + 1;
266 my $seasonal_rran = $hwpredict_rran + 1;
267 my $devseasonal_rran = $hwpredict_rran + 2;
268 my $devpredict_rran = $hwpredict_rran + 3;
269 my $failures_rran = $hwpredict_rran + 4;
271 push( @RRA, sprintf('RRA:HWPREDICT:%d:%e:%e:%d:%d',
278 push( @RRA, sprintf('RRA:SEASONAL:%d:%e:%d',
283 push( @RRA, sprintf('RRA:DEVSEASONAL:%d:%e:%d',
288 push( @RRA, sprintf('RRA:DEVPREDICT:%d:%d',
292 push( @RRA, sprintf('RRA:FAILURES:%d:%d:%d:%d',
299 my $step = $collector->period();
300 my $start = $timestamp - $step;
302 my @OPT = ( sprintf( '--start=%d', $start ),
303 sprintf( '--step=%d', $step ) );
305 &Torrus::DB::checkInterrupted();
307 Debug("Creating RRD $filename: " . join(" ", @OPT, @DS, @RRA));
311 RRDs::create($filename,
316 my $err = RRDs::error();
320 Error("ERROR creating $filename: $err") if $err;
322 delete $sref->{'rrdinfo_ds'}{$filename};
328 my $collector = shift;
330 my $filename = shift;
333 if( not defined( $sref->{'rrdinfo_ds'}{$filename} ) )
336 $sref->{'rrdinfo_ds'}{$filename} = $ref;
340 my $rrdinfo = RRDs::info( $filename );
344 foreach my $prop ( keys %$rrdinfo )
346 if( $prop =~ /^ds\[(\S+)\]\./o )
352 &Torrus::DB::checkInterrupted();
355 # First we compare the sets of datasources in our memory and in RRD file
356 my %ds_updating = ();
359 foreach my $token ( keys %{$tokens} )
361 $ds_updating{ $collector->param($token, 'rrd-ds') } = $token;
364 # Check if we update all datasources in RRD file
365 foreach my $ds ( keys %{$sref->{'rrdinfo_ds'}{$filename}} )
367 if( not $ds_updating{$ds} )
369 Warn('Datasource exists in RRD file, but it is not updated: ' .
370 $ds . ' in ' . $filename);
375 # Check if all DS that we update are defined in RRD
376 foreach my $ds ( keys %ds_updating )
378 if( not $sref->{'rrdinfo_ds'}{$filename}{$ds} )
380 Error("Datasource being updated does not exist: $ds in $filename");
381 delete $ds_updating{$ds};
386 if( $ds_conflict and $moveConflictRRD )
388 if( not -f $filename )
390 Error($filename . 'is not a regular file');
394 my( $sec, $min, $hour, $mday, $mon, $year) = localtime( time() );
395 my $destfile = sprintf('%s_%04d%02d%02d%02d%02d',
397 $year + 1900, $mon+1, $mday, $hour, $min);
399 my $destdir = $conflictRRDPath;
400 if( defined( $destdir ) and -d $destdir )
402 my @fpath = split('/', $destfile);
403 my $fname = pop( @fpath );
404 $destfile = $destdir . '/' . $fname;
407 Warn('Moving the conflicted RRD file ' . $filename .
409 rename( $filename, $destfile ) or
410 Error("Cannot rename $filename to $destfile: $!");
412 delete $sref->{'rrdinfo_ds'}{$filename};
414 createRRD( $collector, $sref, $filename, $tokens );
417 if( scalar( keys %ds_updating ) == 0 )
419 Error("No datasources to update in $filename");
423 &Torrus::DB::checkInterrupted();
425 # Build the arguments for RRDs::update.
429 # We will use the average timestamp
434 my $step = $collector->period();
436 foreach my $ds ( keys %ds_updating )
438 my $token = $ds_updating{$ds};
439 if( length($template) > 0 )
446 my ( $value, $timestamp, $uptime ) = ( 'U', $now, $now );
447 if( ref $sref->{'values'}{$token} )
449 ($value, $timestamp, $uptime) = @{$sref->{'values'}{$token}};
452 push( @timestamps, $timestamp );
453 if( $timestamp > $max_ts )
455 $max_ts = $timestamp;
457 if( $timestamp < $min_ts )
459 $min_ts = $timestamp;
462 # The plus sign generated by BigInt is not a problem for rrdtool
463 $values .= ':'. $value;
466 # Get the average timestamp
468 map {$sum += $_} @timestamps;
469 my $avg_ts = $sum / scalar( @timestamps );
471 if( ($max_ts - $avg_ts) > $Torrus::Global::RRDTimestampTolerance )
473 Error("Maximum timestamp value is beyond the tolerance in $filename");
475 if( ($avg_ts - $min_ts) > $Torrus::Global::RRDTimestampTolerance )
477 Error("Minimum timestamp value is beyond the tolerance in $filename");
480 my @cmd = ( "--template=" . $template,
481 sprintf("%d%s", $avg_ts, $values) );
483 &Torrus::DB::checkInterrupted();
487 # Process errors from RRD update thread
489 while( defined( $errfilename = $thrErrorsQueue->dequeue_nb() ) )
491 delete $sref->{'rrdinfo_ds'}{$errfilename};
494 Debug('Enqueueing update job for ' . $filename);
496 my $cmdlist = &threads::shared::share([]);
497 push( @{$cmdlist}, $filename, @cmd );
498 $thrUpdateQueue->enqueue( $cmdlist );
504 Debug("Updating $filename: " . join(' ', @cmd));
506 RRDs::update( $filename, @cmd );
507 my $err = RRDs::error();
510 Error("ERROR updating $filename: $err");
511 delete $sref->{'rrdinfo_ds'}{$filename};
517 # A background thread that updates RRD files
520 &Torrus::DB::setSafeSignalHandlers();
522 &Torrus::Log::setTID( threads->tid() );
525 &threads::shared::share( \$cmdlist );
529 &Torrus::DB::checkInterrupted();
531 $cmdlist = $thrUpdateQueue->dequeue();
535 Debug("Updating RRD: " . join(' ', @{$cmdlist}));
538 $rrdtoolSemaphore->down();
540 RRDs::update( @{$cmdlist} );
541 my $err = RRDs::error();
543 $rrdtoolSemaphore->up();
547 Error('ERROR updating' . $cmdlist->[0] . ': ' . $err);
548 $thrErrorsQueue->enqueue( $cmdlist->[0] );
555 # Callback executed by Collector
559 my $collector = shift;
562 my $sref = $collector->storageData( 'rrd' );
563 my $filename = $sref->{'filename'}{$token};
565 delete $sref->{'filename'}{$token};
567 delete $sref->{'byfile'}{$filename}{$token};
568 if( scalar( keys %{$sref->{'byfile'}{$filename}} ) == 0 )
570 delete $sref->{'byfile'}{$filename};
573 delete $sref->{'values'}{$token};
582 # indent-tabs-mode: nil
583 # perl-indent-level: 4