import torrus 1.0.9
[freeside.git] / torrus / perllib / Torrus / Collector.pm
1 #  Copyright (C) 2002  Stanislav Sinyagin
2 #
3 #  This program is free software; you can redistribute it and/or modify
4 #  it under the terms of the GNU General Public License as published by
5 #  the Free Software Foundation; either version 2 of the License, or
6 #  (at your option) any later version.
7 #
8 #  This program is distributed in the hope that it will be useful,
9 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 #  GNU General Public License for more details.
12 #
13 #  You should have received a copy of the GNU General Public License
14 #  along with this program; if not, write to the Free Software
15 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
16
17 # $Id: Collector.pm,v 1.1 2010-12-27 00:03:38 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
19
20
21 package Torrus::Collector;
22 @Torrus::Collector::ISA = qw(Torrus::Scheduler::PeriodicTask);
23
24 use strict;
25 use Torrus::ConfigTree;
26 use Torrus::Log;
27 use Torrus::RPN;
28 use Torrus::Scheduler;
29
30 BEGIN
31 {
32     foreach my $mod ( @Torrus::Collector::loadModules )
33     {
34         eval( 'require ' . $mod );
35         die( $@ ) if $@;
36     }
37 }
38
39 # Executed once after the fork. Here modules can launch processing threads
40 sub initThreads
41 {
42     foreach my $key ( %Torrus::Collector::initThreadsHandlers )
43     {
44         if( ref( $Torrus::Collector::initThreadsHandlers{$key} ) )
45         {
46             &{$Torrus::Collector::initThreadsHandlers{$key}}();
47         }
48     }
49 }
50
51
52 ## One collector module instance holds all leaf tokens which
53 ## must be collected at the same time.
54
55 sub new
56 {
57     my $proto = shift;
58     my %options = @_;
59
60     if( not $options{'-Name'} )
61     {
62         $options{'-Name'} = "Collector";
63     }
64
65     my $class = ref($proto) || $proto;
66     my $self  = $class->SUPER::new( %options );
67     bless $self, $class;
68
69     foreach my $collector_type ( keys %Torrus::Collector::collectorTypes )
70     {
71         $self->{'types'}{$collector_type} = {};
72         $self->{'types_in_use'}{$collector_type} = 0;
73     }
74
75     foreach my $storage_type ( keys %Torrus::Collector::storageTypes )
76     {
77         $self->{'storage'}{$storage_type} = {};
78         $self->{'storage_in_use'}{$storage_type} = 0;
79         
80         my $storage_string = $storage_type . '-storage';
81         if( ref( $Torrus::Collector::initStorage{$storage_string} ) )
82         {
83             &{$Torrus::Collector::initStorage{$storage_string}}($self);
84         }
85     }
86
87     $self->{'tree_name'} = $options{'-TreeName'};
88     
89     return $self;
90 }
91
92
93 sub addTarget
94 {
95     my $self = shift;
96     my $config_tree = shift;
97     my $token = shift;
98
99     my $ok = 1;
100     $self->{'targets'}{$token}{'path'} = $config_tree->path($token);
101
102     my $collector_type = $config_tree->getNodeParam($token, 'collector-type');
103     if( not $Torrus::Collector::collectorTypes{$collector_type} )
104     {
105         Error('Unknown collector type: ' . $collector_type);
106         return;
107     }
108
109     $self->fetchParams($config_tree, $token, $collector_type);
110
111     $self->{'targets'}{$token}{'type'} = $collector_type;
112     $self->{'types_in_use'}{$collector_type} = 1;
113     
114     my $storage_types = $config_tree->getNodeParam($token, 'storage-type');
115     foreach my $storage_type ( split( ',', $storage_types ) )
116     {
117         if( not $Torrus::Collector::storageTypes{$storage_type} )
118         {
119             Error('Unknown storage type: ' . $storage_type);
120         }
121         else
122         {
123             my $storage_string = $storage_type . '-storage';
124             if( not exists( $self->{'targets'}{$token}{'storage-types'} ) )
125             {
126                 $self->{'targets'}{$token}{'storage-types'} = [];
127             }
128             push( @{$self->{'targets'}{$token}{'storage-types'}},
129                   $storage_type );
130             
131             $self->fetchParams($config_tree, $token, $storage_string);
132             $self->{'storage_in_use'}{$storage_type} = 1;
133         }
134     }
135
136     # If specified, store the value transformation code
137     my $code = $config_tree->getNodeParam($token, 'transform-value');
138     if( defined $code )
139     {
140         $self->{'targets'}{$token}{'transform'} = $code;
141     }
142     
143     # If specified, store the scale RPN
144     my $scalerpn = $config_tree->getNodeParam($token, 'collector-scale');
145     if( defined $scalerpn )
146     {
147         $self->{'targets'}{$token}{'scalerpn'} = $scalerpn;
148     }
149     
150     # If specified, store the value map
151     my $valueMap = $config_tree->getNodeParam($token, 'value-map');
152     if( defined $valueMap and length($valueMap) > 0 )
153     {
154         my $map = {};
155         foreach my $item ( split( ',', $valueMap ) )
156         {
157             my ($key, $value) = split( ':', $item );
158             $map->{$key} = $value;
159         }
160         $self->{'targets'}{$token}{'value-map'} = $map;
161     }
162
163     # Initialize local token, collectpor, and storage data
164     if( not defined $self->{'targets'}{$token}{'local'} )
165     {
166         $self->{'targets'}{$token}{'local'} = {};
167     }
168
169     if( ref( $Torrus::Collector::initTarget{$collector_type} ) )
170     {
171         $ok = &{$Torrus::Collector::initTarget{$collector_type}}($self,
172                                                                  $token);
173     }
174
175     if( $ok )
176     {
177         foreach my $storage_type
178             ( @{$self->{'targets'}{$token}{'storage-types'}} )
179         {
180             my $storage_string = $storage_type . '-storage';
181             if( ref( $Torrus::Collector::initTarget{$storage_string} ) )
182             {
183                 &{$Torrus::Collector::initTarget{$storage_string}}($self,
184                                                                    $token);
185             }
186         }
187     }
188     
189     if( not $ok )
190     {
191         $self->deleteTarget( $token );
192     }
193 }
194
195
196 sub fetchParams
197 {
198     my $self = shift;
199     my $config_tree = shift;
200     my $token = shift;
201     my $type = shift;
202
203     if( not defined( $Torrus::Collector::params{$type} ) )
204     {
205         Error("\%Torrus::Collector::params does not have member $type");
206         return;
207     }
208
209     my $ref = \$self->{'targets'}{$token}{'params'};
210
211     my @maps = ( $Torrus::Collector::params{$type} );
212
213     while( scalar( @maps ) > 0 )
214     {
215         &Torrus::DB::checkInterrupted();
216         
217         my @next_maps = ();
218         foreach my $map ( @maps )
219         {
220             foreach my $param ( keys %{$map} )
221             {
222                 my $value = $config_tree->getNodeParam( $token, $param );
223
224                 if( ref( $map->{$param} ) )
225                 {
226                     if( defined $value )
227                     {
228                         if( exists $map->{$param}->{$value} )
229                         {
230                             if( defined $map->{$param}->{$value} )
231                             {
232                                 push( @next_maps,
233                                       $map->{$param}->{$value} );
234                             }
235                         }
236                         else
237                         {
238                             Error("Parameter $param has unknown value: " .
239                                   $value . " in " . $self->path($token));
240                         }
241                     }
242                 }
243                 else
244                 {
245                     if( not defined $value )
246                     {
247                         # We know the default value
248                         $value = $map->{$param};
249                     }
250                 }
251                 # Finally store the value
252                 if( defined $value )
253                 {
254                     $$ref->{$param} = $value;
255                 }
256             }
257         }
258         @maps = @next_maps;
259     }
260 }
261
262
263 sub fetchMoreParams
264 {
265     my $self = shift;
266     my $config_tree = shift;
267     my $token = shift;
268     my @params = @_;
269
270     &Torrus::DB::checkInterrupted();
271
272     my $ref = \$self->{'targets'}{$token}{'params'};
273
274     foreach my $param ( @params )
275     {
276         my $value = $config_tree->getNodeParam( $token, $param );
277         if( defined $value )
278         {
279             $$ref->{$param} = $value;
280         }
281     }
282 }
283
284
285 sub param
286 {
287     my $self = shift;
288     my $token = shift;
289     my $param = shift;
290
291     return $self->{'targets'}{$token}{'params'}{$param};
292 }
293
294 sub setParam
295 {
296     my $self = shift;
297     my $token = shift;
298     my $param = shift;
299     my $value = shift;
300
301     $self->{'targets'}{$token}{'params'}{$param} = $value;
302 }
303
304
305 sub path
306 {
307     my $self = shift;
308     my $token = shift;
309
310     return $self->{'targets'}{$token}{'path'};
311 }
312
313 sub listCollectorTargets
314 {
315     my $self = shift;
316     my $collector_type = shift;
317
318     my @ret;
319     foreach my $token ( keys %{$self->{'targets'}} )
320     {
321         if( $self->{'targets'}{$token}{'type'} eq $collector_type )
322         {
323             push( @ret, $token );
324         }
325     }
326     return @ret;
327 }
328
329 # A callback procedure that will be executed on deleteTarget()
330
331 sub registerDeleteCallback
332 {
333     my $self = shift;
334     my $token = shift;
335     my $proc = shift;
336
337     if( not ref( $self->{'targets'}{$token}{'deleteProc'} ) )
338     {
339         $self->{'targets'}{$token}{'deleteProc'} = [];
340     }
341     push( @{$self->{'targets'}{$token}{'deleteProc'}}, $proc );
342 }
343
344 sub deleteTarget
345 {
346     my $self = shift;
347     my $token = shift;
348
349     &Torrus::DB::checkInterrupted();
350
351     Info('Deleting target: ' . $self->path($token));
352     
353     if( ref( $self->{'targets'}{$token}{'deleteProc'} ) )
354     {
355         foreach my $proc ( @{$self->{'targets'}{$token}{'deleteProc'}} )
356         {
357             &{$proc}( $self, $token );
358         }
359     }
360     delete $self->{'targets'}{$token};
361 }
362
363 # Returns a reference to token-specific local data
364
365 sub tokenData
366 {
367     my $self = shift;
368     my $token = shift;
369
370     return $self->{'targets'}{$token}{'local'};
371 }
372
373 # Returns a reference to collector type-specific local data
374
375 sub collectorData
376 {
377     my $self = shift;
378     my $type = shift;
379
380     return $self->{'types'}{$type};
381 }
382
383 # Returns a reference to storage type-specific local data
384
385 sub storageData
386 {
387     my $self = shift;
388     my $type = shift;
389
390     return $self->{'storage'}{$type};
391 }
392
393
394 # Runs each collector type, and then stores the values
395 sub run
396 {
397     my $self = shift;
398
399     undef $self->{'values'};
400
401     while( my ($collector_type, $ref) = each %{$self->{'types'}} )
402     {
403         next unless $self->{'types_in_use'}{$collector_type};
404
405         &Torrus::DB::checkInterrupted();
406         
407         if( $Torrus::Collector::needsConfigTree
408             {$collector_type}{'runCollector'} )
409         {
410             $self->{'config_tree'} =
411                 new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
412                                         -Wait => 1 );
413         }
414         
415         &{$Torrus::Collector::runCollector{$collector_type}}( $self, $ref );
416         
417         if( defined( $self->{'config_tree'} ) )
418         {
419             undef $self->{'config_tree'};
420         }
421     }
422
423     while( my ($storage_type, $ref) = each %{$self->{'storage'}} )
424     {
425         next unless $self->{'storage_in_use'}{$storage_type};
426         
427         &Torrus::DB::checkInterrupted();
428         
429         if( $Torrus::Collector::needsConfigTree
430             {$storage_type}{'storeData'} )
431         {
432             $self->{'config_tree'} =
433                 new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
434                                         -Wait => 1 );
435         }
436
437         &{$Torrus::Collector::storeData{$storage_type}}( $self, $ref );
438
439         if( defined( $self->{'config_tree'} ) )
440         {
441             undef $self->{'config_tree'};
442         }        
443     }
444     
445     while( my ($collector_type, $ref) = each %{$self->{'types'}} )
446     {
447         next unless $self->{'types_in_use'}{$collector_type};
448         
449         if( ref( $Torrus::Collector::postProcess{$collector_type} ) )
450         {
451             &Torrus::DB::checkInterrupted();
452             
453             if( $Torrus::Collector::needsConfigTree
454                 {$collector_type}{'postProcess'} )
455             {
456                 $self->{'config_tree'} =
457                     new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
458                                             -Wait => 1 );
459             }
460             
461             &{$Torrus::Collector::postProcess{$collector_type}}( $self, $ref );
462
463             if( defined( $self->{'config_tree'} ) )
464             {
465                 undef $self->{'config_tree'};
466             }
467         }
468     }
469 }
470
471
472 # This procedure is called by the collector type-specific functions
473 # every time there's a new value for a token
474 sub setValue
475 {
476     my $self = shift;
477     my $token = shift;
478     my $value = shift;
479     my $timestamp = shift;
480     my $uptime = shift;
481
482     if( $value ne 'U' )
483     {
484         if( defined( my $code = $self->{'targets'}{$token}{'transform'} ) )
485         {            
486             # Screen out the percent sign and $_
487             $code =~ s/DOLLAR/\$/gm;
488             $code =~ s/MOD/\%/gm;
489             Debug('Value before transformation: ' . $value);
490             $_ = $value;
491             $value = do { eval $code };
492             if( $@ )
493             {
494                 Error('Fatal error in transformation code: ' . $@ );
495                 $value = 'U';
496             }
497             elsif( $value !~ /^[0-9.+-eE]+$/o and $value ne 'U' )
498             {
499                 Error('Non-numeric value after transformation: ' . $value);
500                 $value = 'U';
501             }
502         }
503         elsif( defined( my $map = $self->{'targets'}{$token}{'value-map'} ) )
504         {
505             my $newValue;
506             if( defined( $map->{$value} ) )
507             {
508                 $newValue = $map->{$value};
509             }
510             elsif( defined( $map->{'_'} ) )
511             {
512                 $newValue = $map->{'_'};
513             }
514             else
515             {
516                 Warn('Could not find value mapping for ' . $value .
517                      'in ' . $self->path($token));
518             }
519
520             if( defined( $newValue ) )
521             {
522                 Debug('Value mapping: ' . $value . ' -> ' . $newValue);
523                 $value = $newValue;
524             }
525         }
526
527         if( defined( $self->{'targets'}{$token}{'scalerpn'} ) )
528         {
529             Debug('Value before scaling: ' . $value);
530             my $rpn = new Torrus::RPN;
531             $value = $rpn->run( $value . ',' .
532                                 $self->{'targets'}{$token}{'scalerpn'},
533                                 sub{} );
534         }
535     }
536
537     if( isDebug() )
538     {
539         Debug('Value ' . $value . ' set for ' .
540               $self->path($token) . ' TS=' . $timestamp);
541     }
542
543     foreach my $storage_type
544         ( @{$self->{'targets'}{$token}{'storage-types'}} )
545     {
546         &{$Torrus::Collector::setValue{$storage_type}}( $self, $token,
547                                                         $value, $timestamp,
548                                                         $uptime );
549     }
550 }
551
552
553 sub configTree
554 {
555     my $self = shift;
556
557     if( defined( $self->{'config_tree'} ) )
558     {
559         return $self->{'config_tree'};
560     }
561     else
562     {
563         Error('Cannot provide ConfigTree object');
564         return undef;
565     }
566 }
567
568
569 #######  Collector scheduler  ########
570
571 package Torrus::CollectorScheduler;
572 @Torrus::CollectorScheduler::ISA = qw(Torrus::Scheduler);
573
574 use Torrus::ConfigTree;
575 use Torrus::Log;
576 use Torrus::Scheduler;
577 use Torrus::TimeStamp;
578
579
580 sub beforeRun
581 {
582     my $self = shift;
583
584     &Torrus::DB::checkInterrupted();
585
586     my $tree = $self->treeName();
587     my $config_tree = new Torrus::ConfigTree(-TreeName => $tree, -Wait => 1);
588     if( not defined( $config_tree ) )
589     {
590         return undef;
591     }
592
593     my $data = $self->data();
594
595     my $instance = $self->{'options'}{'-Instance'};
596         
597     # Prepare the list of tokens, sorted by period and offset,
598     # from config tree or from cache.
599
600     my $need_new_tasks = 0;
601
602     Torrus::TimeStamp::init();
603     my $timestamp_key = $tree . ':' . $instance . ':collector_cache';
604     my $known_ts = Torrus::TimeStamp::get( $timestamp_key );
605     my $actual_ts = $config_tree->getTimestamp();
606     
607     if( $actual_ts >= $known_ts or not $data->{'targets_initialized'} )
608     {
609         Info('Initializing tasks for collector instance ' . $instance);
610         Debug("Config TS: $actual_ts, Collector TS: $known_ts");
611         my $init_start = time();
612
613         my $targets = {};
614
615         my $db_tokens =
616             new Torrus::DB('collector_tokens' . '_' . $instance . '_' .
617                            $config_tree->{'ds_config_instance'},
618                            -Subdir => $tree);
619         
620         my $cursor = $db_tokens->cursor();
621         while( my ($token, $schedule) = $db_tokens->next($cursor) )
622         {
623             my ($period, $offset) = split(/:/o, $schedule);
624             if( not exists( $targets->{$period}{$offset} ) )
625             {
626                 $targets->{$period}{$offset} = [];
627             }
628             push( @{$targets->{$period}{$offset}}, $token );
629
630             &Torrus::DB::checkInterrupted();
631         }
632         undef $cursor;
633         $db_tokens->closeNow();
634         undef $db_tokens;
635
636         &Torrus::DB::checkInterrupted();
637
638         # Set the timestamp
639         &Torrus::TimeStamp::setNow( $timestamp_key );
640         
641         $self->flushTasks();
642
643         foreach my $period ( keys %{$targets} )
644         {
645             foreach my $offset ( keys %{$targets->{$period}} )
646             {
647                 my $collector =
648                     new Torrus::Collector( -Period => $period,
649                                            -Offset => $offset,
650                                            -TreeName => $tree,
651                                            -Instance => $instance );
652
653                 foreach my $token ( @{$targets->{$period}{$offset}} )
654                 {
655                     &Torrus::DB::checkInterrupted();
656                     $collector->addTarget( $config_tree, $token );
657                 }
658
659                 $self->addTask( $collector );
660             }
661         }
662         Verbose(sprintf("Tasks initialization finished in %d seconds",
663                         time() - $init_start));
664
665         $data->{'targets_initialized'} = 1;        
666         Info('Tasks for collector instance ' . $instance . ' initialized');
667
668         foreach my $collector_type ( keys %Torrus::Collector::collectorTypes )
669         {
670             if( ref($Torrus::Collector::initCollectorGlobals{
671                 $collector_type}) )
672             {
673                 &{$Torrus::Collector::initCollectorGlobals{
674                     $collector_type}}($tree, $instance);
675                 
676                 Verbose('Initialized collector globals for type: ' .
677                         $collector_type);
678             }
679         }
680     }
681     
682     Torrus::TimeStamp::release();
683     
684     return 1;
685 }
686
687
688 1;
689
690
691 # Local Variables:
692 # mode: perl
693 # indent-tabs-mode: nil
694 # perl-indent-level: 4
695 # End: