import torrus 1.0.9
[freeside.git] / torrus / perllib / Torrus / Collector / RRDStorage.pm
1 #  Copyright (C) 2002-2007  Stanislav Sinyagin
2 #
3 #  This program is free software; you can redistribute it and/or modify
4 #  it under the terms of the GNU General Public License as published by
5 #  the Free Software Foundation; either version 2 of the License, or
6 #  (at your option) any later version.
7 #
8 #  This program is distributed in the hope that it will be useful,
9 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 #  GNU General Public License for more details.
12 #
13 #  You should have received a copy of the GNU General Public License
14 #  along with this program; if not, write to the Free Software
15 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
16
17 # $Id: RRDStorage.pm,v 1.1 2010-12-27 00:03:58 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
19
20 package Torrus::Collector::RRDStorage;
21
22 use Torrus::ConfigTree;
23 use Torrus::Log;
24
25 use strict;
26 use RRDs;
27
28 our $useThreads;
29 our $threadsInUse = 0;
30 our $thrQueueLimit;
31 our $thrUpdateQueue;
32 our $thrErrorsQueue;
33 # RRDtool is not reentrant. use this semaphore for every call to RRDs::*
34 our $rrdtoolSemaphore;
35 our $thrUpdateThread;
36
37 our $moveConflictRRD;
38 our $conflictRRDPath;
39
40 # Register the storage type
41 $Torrus::Collector::storageTypes{'rrd'} = 1;
42
43
44 # List of needed parameters and default values
45
46 $Torrus::Collector::params{'rrd-storage'} = {
47     'data-dir' => undef,
48     'data-file' => undef,
49     'rrd-create-rra' => undef,
50     'rrd-create-heartbeat' => undef,
51     'rrd-create-min'  => 'U',
52     'rrd-create-max'  => 'U',
53     'rrd-hwpredict'   => {
54         'enabled' => {
55             'rrd-create-hw-alpha' => 0.1,
56             'rrd-create-hw-beta'  => 0.0035,
57             'rrd-create-hw-gamma' => 0.1,
58             'rrd-create-hw-winlen' => 9,
59             'rrd-create-hw-failth' => 6,
60             'rrd-create-hw-season' => 288,
61             'rrd-create-hw-rralen' => undef },
62         'disabled' => undef },
63     'rrd-create-dstype' => undef,
64     'rrd-ds' => undef
65     };
66
67
68 $Torrus::Collector::initThreadsHandlers{'rrd-storage'} =
69     \&Torrus::Collector::RRDStorage::initThreads;
70
71 sub initThreads
72 {
73     if( $useThreads and not defined( $thrUpdateThread ) )
74     {
75         Verbose('RRD storage is configured for multithreading. Initializing ' .
76                 'the background thread');
77         require threads;
78         require threads::shared;
79         require Thread::Queue;
80         require Thread::Semaphore;
81
82         $thrUpdateQueue = new Thread::Queue;
83         $thrErrorsQueue = new Thread::Queue;
84         $rrdtoolSemaphore = new Thread::Semaphore;
85         
86         $thrUpdateThread = threads->create( \&rrdUpdateThread );
87         $thrUpdateThread->detach();
88         $threadsInUse = 1;
89     }
90 }
91
92
93
94 $Torrus::Collector::initTarget{'rrd-storage'} =
95     \&Torrus::Collector::RRDStorage::initTarget;
96
97 sub initTarget
98 {
99     my $collector = shift;
100     my $token = shift;
101
102     my $sref = $collector->storageData( 'rrd' );
103
104     $collector->registerDeleteCallback
105         ( $token, \&Torrus::Collector::RRDStorage::deleteTarget );
106
107     my $filename =
108         $collector->param($token, 'data-dir') . '/' .
109         $collector->param($token, 'data-file');
110
111     $sref->{'byfile'}{$filename}{$token} = 1;
112     $sref->{'filename'}{$token} = $filename;
113 }
114
115
116
117 $Torrus::Collector::setValue{'rrd'} =
118     \&Torrus::Collector::RRDStorage::setValue;
119
120
121 sub setValue
122 {
123     my $collector = shift;
124     my $token = shift;
125     my $value = shift;
126     my $timestamp = shift;
127     my $uptime = shift;
128
129     my $sref = $collector->storageData( 'rrd' );
130
131     $sref->{'values'}{$token} = [$value, $timestamp, $uptime];
132 }
133
134
135 $Torrus::Collector::storeData{'rrd'} =
136     \&Torrus::Collector::RRDStorage::storeData;
137
138 sub storeData
139 {
140     my $collector = shift;
141     my $sref = shift;
142
143     if( $threadsInUse )
144     {
145         $collector->setStatValue( 'RRDQueue', $thrUpdateQueue->pending() );
146     }
147     
148     if( $threadsInUse and $thrUpdateQueue->pending() > $thrQueueLimit )
149     {
150         Error('Cannot enqueue RRD files for updating: ' .
151               'queue size is above limit');
152     }
153     else
154     {
155         while( my ($filename, $tokens) = each %{$sref->{'byfile'}} )
156         {
157             &Torrus::DB::checkInterrupted();
158             
159             if( not -e $filename )
160             {
161                 createRRD( $collector, $sref, $filename, $tokens );
162             }
163             
164             if( -e $filename )
165             {
166                 updateRRD( $collector, $sref, $filename, $tokens );
167             }
168         }
169     }
170
171     delete $sref->{'values'};
172 }
173
174
175 sub semaphoreDown
176 {    
177     if( $threadsInUse )
178     {
179         $rrdtoolSemaphore->down();
180     }
181 }
182
183 sub semaphoreUp
184 {
185     if( $threadsInUse )
186     {
187         $rrdtoolSemaphore->up();
188     }
189 }
190
191
192 sub createRRD
193 {
194     my $collector = shift;
195     my $sref = shift;
196     my $filename = shift;
197     my $tokens  = shift;
198
199     # We use hashes here, in order to make the superset of RRA
200     # definitions, and unique RRD names
201     my %DS_hash;
202     my %RRA_hash;
203
204     # Holt-Winters parameters
205     my $needs_hw = 0;
206     my %hwparam;
207
208     my $timestamp = time();
209
210     foreach my $token ( keys %{$tokens} )
211     {
212         my $ds_string =
213             sprintf('DS:%s:%s:%d:%s:%s',
214                     $collector->param($token, 'rrd-ds'),
215                     $collector->param($token, 'rrd-create-dstype'),
216                     $collector->param($token, 'rrd-create-heartbeat'),
217                     $collector->param($token, 'rrd-create-min'),
218                     $collector->param($token, 'rrd-create-max'));
219         $DS_hash{$ds_string} = 1;
220
221         foreach my $rra_string
222             ( split(/\s+/, $collector->param($token, 'rrd-create-rra')) )
223         {
224             $RRA_hash{$rra_string} = 1;
225         }
226
227         if( $collector->param($token, 'rrd-hwpredict') eq 'enabled' )
228         {
229             $needs_hw = 1;
230
231             foreach my $param ( 'alpha', 'beta', 'gamma', 'winlen', 'failth',
232                                 'season', 'rralen' )
233             {
234                 my $value = $collector->param($token, 'rrd-create-hw-'.$param);
235
236                 if( defined( $hwparam{$param} ) and
237                     $hwparam{$param} != $value )
238                 {
239                     my $paramname = 'rrd-create-hw-'.$param;
240                     Warn("Parameter " . $paramname . " was already defined " .
241                          "with differentr value for " . $filename);
242                 }
243
244                 $hwparam{$param} = $value;
245             }
246         }
247
248         if( ref $sref->{'values'}{$token} )
249         {
250             my $new_ts = $sref->{'values'}{$token}[1];
251             if( $new_ts > 0 and $new_ts < $timestamp )
252             {
253                 $timestamp = $new_ts;
254             }
255         }
256     }
257
258     my @DS = sort keys %DS_hash;
259     my @RRA = sort keys %RRA_hash;
260
261     if( $needs_hw )
262     {
263         ## Define the RRAs for Holt-Winters prediction
264
265         my $hwpredict_rran   = scalar(@RRA) + 1;
266         my $seasonal_rran    = $hwpredict_rran + 1;
267         my $devseasonal_rran = $hwpredict_rran + 2;
268         my $devpredict_rran  = $hwpredict_rran + 3;
269         my $failures_rran    = $hwpredict_rran + 4;
270
271         push( @RRA, sprintf('RRA:HWPREDICT:%d:%e:%e:%d:%d',
272                             $hwparam{'rralen'},
273                             $hwparam{'alpha'},
274                             $hwparam{'beta'},
275                             $hwparam{'season'},
276                             $seasonal_rran));
277
278         push( @RRA, sprintf('RRA:SEASONAL:%d:%e:%d',
279                             $hwparam{'season'},
280                             $hwparam{'gamma'},
281                             $hwpredict_rran));
282
283         push( @RRA, sprintf('RRA:DEVSEASONAL:%d:%e:%d',
284                             $hwparam{'season'},
285                             $hwparam{'gamma'},
286                             $hwpredict_rran));
287
288         push( @RRA, sprintf('RRA:DEVPREDICT:%d:%d',
289                             $hwparam{'rralen'},
290                             $devseasonal_rran));
291
292         push( @RRA, sprintf('RRA:FAILURES:%d:%d:%d:%d',
293                             $hwparam{'rralen'},
294                             $hwparam{'failth'},
295                             $hwparam{'winlen'},
296                             $devseasonal_rran));
297     }
298
299     my $step = $collector->period();
300     my $start = $timestamp - $step;
301
302     my @OPT = ( sprintf( '--start=%d', $start ),
303                 sprintf( '--step=%d', $step ) );
304
305     &Torrus::DB::checkInterrupted();
306     
307     Debug("Creating RRD $filename: " . join(" ", @OPT, @DS, @RRA));
308
309     semaphoreDown();
310     
311     RRDs::create($filename,
312                  @OPT,
313                  @DS,
314                  @RRA);
315
316     my $err = RRDs::error();
317
318     semaphoreUp();
319
320     Error("ERROR creating $filename: $err") if $err;
321     
322     delete $sref->{'rrdinfo_ds'}{$filename};
323 }
324
325
326 sub updateRRD
327 {
328     my $collector = shift;
329     my $sref = shift;
330     my $filename = shift;
331     my $tokens  = shift;
332
333     if( not defined( $sref->{'rrdinfo_ds'}{$filename} ) )
334     {
335         my $ref = {};
336         $sref->{'rrdinfo_ds'}{$filename} = $ref;
337
338         semaphoreDown();
339         
340         my $rrdinfo = RRDs::info( $filename );
341
342         semaphoreUp();
343
344         foreach my $prop ( keys %$rrdinfo )
345         {
346             if( $prop =~ /^ds\[(\S+)\]\./o )
347             {
348                 $ref->{$1} = 1;
349             }
350         }
351         
352         &Torrus::DB::checkInterrupted();
353     }
354
355     # First we compare the sets of datasources in our memory and in RRD file
356     my %ds_updating = ();
357     my $ds_conflict = 0;
358
359     foreach my $token ( keys %{$tokens} )
360     {
361         $ds_updating{ $collector->param($token, 'rrd-ds') } = $token;
362     }
363
364     # Check if we update all datasources in RRD file
365     foreach my $ds ( keys %{$sref->{'rrdinfo_ds'}{$filename}} )
366     {
367         if( not $ds_updating{$ds} )
368         {
369             Warn('Datasource exists in RRD file, but it is not updated: ' .
370                  $ds . ' in ' . $filename);
371             $ds_conflict = 1;
372         }
373     }
374
375     # Check if all DS that we update are defined in RRD
376     foreach my $ds ( keys %ds_updating )
377     {
378         if( not $sref->{'rrdinfo_ds'}{$filename}{$ds} )
379         {
380             Error("Datasource being updated does not exist: $ds in $filename");
381             delete $ds_updating{$ds};
382             $ds_conflict = 1;
383         }
384     }
385
386     if( $ds_conflict and $moveConflictRRD )
387     {
388         if( not -f $filename )
389         {
390             Error($filename . 'is not a regular file');
391             return;
392         }
393         
394         my( $sec, $min, $hour, $mday, $mon, $year) = localtime( time() );
395         my $destfile = sprintf('%s_%04d%02d%02d%02d%02d',
396                                $filename,
397                                $year + 1900, $mon+1, $mday, $hour, $min);
398         
399         my $destdir = $conflictRRDPath;
400         if( defined( $destdir ) and -d $destdir )
401         {
402             my @fpath = split('/', $destfile);
403             my $fname = pop( @fpath );
404             $destfile = $destdir . '/' . $fname;
405         }
406
407         Warn('Moving the conflicted RRD file ' . $filename .
408              ' to ' . $destfile);
409         rename( $filename, $destfile ) or
410             Error("Cannot rename $filename to $destfile: $!");
411         
412         delete $sref->{'rrdinfo_ds'}{$filename};
413         
414         createRRD( $collector, $sref, $filename, $tokens );
415     }
416         
417     if( scalar( keys %ds_updating ) == 0 )
418     {
419         Error("No datasources to update in $filename");
420         return;
421     }
422
423     &Torrus::DB::checkInterrupted();
424
425     # Build the arguments for RRDs::update.
426     my $template;
427     my $values;
428
429     # We will use the average timestamp
430     my @timestamps;
431     my $max_ts = 0;
432     my $min_ts = time();
433
434     my $step = $collector->period();
435
436     foreach my $ds ( keys %ds_updating )
437     {
438         my $token = $ds_updating{$ds};
439         if( length($template) > 0 )
440         {
441             $template .= ':';
442         }
443         $template .= $ds;
444
445         my $now = time();
446         my ( $value, $timestamp, $uptime ) = ( 'U', $now, $now );
447         if( ref $sref->{'values'}{$token} )
448         {
449             ($value, $timestamp, $uptime) = @{$sref->{'values'}{$token}};
450         }
451
452         push( @timestamps, $timestamp );
453         if( $timestamp > $max_ts )
454         {
455             $max_ts = $timestamp;
456         }
457         if( $timestamp < $min_ts )
458         {
459             $min_ts = $timestamp;
460         }
461
462         # The plus sign generated by BigInt is not a problem for rrdtool
463         $values .= ':'. $value;
464     }
465
466     # Get the average timestamp
467     my $sum = 0;
468     map {$sum += $_} @timestamps;
469     my $avg_ts = $sum / scalar( @timestamps );
470
471     if( ($max_ts - $avg_ts) > $Torrus::Global::RRDTimestampTolerance )
472     {
473         Error("Maximum timestamp value is beyond the tolerance in $filename");
474     }
475     if( ($avg_ts - $min_ts) > $Torrus::Global::RRDTimestampTolerance )
476     {
477         Error("Minimum timestamp value is beyond the tolerance in $filename");
478     }
479
480     my @cmd = ( "--template=" . $template,
481                 sprintf("%d%s", $avg_ts, $values) );
482
483     &Torrus::DB::checkInterrupted();
484
485     if( $threadsInUse )
486     {
487         # Process errors from RRD update thread
488         my $errfilename;
489         while( defined( $errfilename = $thrErrorsQueue->dequeue_nb() ) )
490         {
491             delete $sref->{'rrdinfo_ds'}{$errfilename};
492         }
493
494         Debug('Enqueueing update job for ' . $filename);
495         
496         my $cmdlist = &threads::shared::share([]);
497         push( @{$cmdlist}, $filename, @cmd );
498         $thrUpdateQueue->enqueue( $cmdlist );
499     }
500     else
501     {
502         if( isDebug )
503         {
504             Debug("Updating $filename: " . join(' ', @cmd));
505         }
506         RRDs::update( $filename, @cmd );
507         my $err = RRDs::error();
508         if( $err )
509         {
510             Error("ERROR updating $filename: $err");
511             delete $sref->{'rrdinfo_ds'}{$filename};
512         }
513     }
514 }
515
516
517 # A background thread that updates RRD files
518 sub rrdUpdateThread
519 {
520     &Torrus::DB::setSafeSignalHandlers();
521     $| = 1;
522     &Torrus::Log::setTID( threads->tid() );
523     
524     my $cmdlist;
525     &threads::shared::share( \$cmdlist );
526     
527     while(1)
528     {
529         &Torrus::DB::checkInterrupted();
530         
531         $cmdlist = $thrUpdateQueue->dequeue();
532         
533         if( isDebug )
534         {
535             Debug("Updating RRD: " . join(' ', @{$cmdlist}));
536         }
537
538         $rrdtoolSemaphore->down();
539
540         RRDs::update( @{$cmdlist} );
541         my $err = RRDs::error();
542
543         $rrdtoolSemaphore->up();
544
545         if( $err )
546         {
547             Error('ERROR updating' . $cmdlist->[0] . ': ' . $err);
548             $thrErrorsQueue->enqueue( $cmdlist->[0] );
549         }
550     }
551 }
552
553
554
555 # Callback executed by Collector
556
557 sub deleteTarget
558 {
559     my $collector = shift;
560     my $token = shift;
561
562     my $sref = $collector->storageData( 'rrd' );
563     my $filename = $sref->{'filename'}{$token};
564
565     delete $sref->{'filename'}{$token};
566
567     delete $sref->{'byfile'}{$filename}{$token};
568     if( scalar( keys %{$sref->{'byfile'}{$filename}} ) == 0 )
569     {
570         delete $sref->{'byfile'}{$filename};
571     }
572
573     delete $sref->{'values'}{$token};
574 }
575
576
577 1;
578
579
580 # Local Variables:
581 # mode: perl
582 # indent-tabs-mode: nil
583 # perl-indent-level: 4
584 # End: