1 # Copyright (C) 2002 Stanislav Sinyagin
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
17 # $Id: Collector.pm,v 1.1 2010-12-27 00:03:38 ivan Exp $
18 # Stanislav Sinyagin <ssinyagin@yahoo.com>
21 package Torrus::Collector;
22 @Torrus::Collector::ISA = qw(Torrus::Scheduler::PeriodicTask);
25 use Torrus::ConfigTree;
28 use Torrus::Scheduler;
32 foreach my $mod ( @Torrus::Collector::loadModules )
34 eval( 'require ' . $mod );
39 # Executed once after the fork. Here modules can launch processing threads
42 foreach my $key ( %Torrus::Collector::initThreadsHandlers )
44 if( ref( $Torrus::Collector::initThreadsHandlers{$key} ) )
46 &{$Torrus::Collector::initThreadsHandlers{$key}}();
52 ## One collector module instance holds all leaf tokens which
53 ## must be collected at the same time.
60 if( not $options{'-Name'} )
62 $options{'-Name'} = "Collector";
65 my $class = ref($proto) || $proto;
66 my $self = $class->SUPER::new( %options );
69 foreach my $collector_type ( keys %Torrus::Collector::collectorTypes )
71 $self->{'types'}{$collector_type} = {};
72 $self->{'types_in_use'}{$collector_type} = 0;
75 foreach my $storage_type ( keys %Torrus::Collector::storageTypes )
77 $self->{'storage'}{$storage_type} = {};
78 $self->{'storage_in_use'}{$storage_type} = 0;
80 my $storage_string = $storage_type . '-storage';
81 if( ref( $Torrus::Collector::initStorage{$storage_string} ) )
83 &{$Torrus::Collector::initStorage{$storage_string}}($self);
87 $self->{'tree_name'} = $options{'-TreeName'};
96 my $config_tree = shift;
100 $self->{'targets'}{$token}{'path'} = $config_tree->path($token);
102 my $collector_type = $config_tree->getNodeParam($token, 'collector-type');
103 if( not $Torrus::Collector::collectorTypes{$collector_type} )
105 Error('Unknown collector type: ' . $collector_type);
109 $self->fetchParams($config_tree, $token, $collector_type);
111 $self->{'targets'}{$token}{'type'} = $collector_type;
112 $self->{'types_in_use'}{$collector_type} = 1;
114 my $storage_types = $config_tree->getNodeParam($token, 'storage-type');
115 foreach my $storage_type ( split( ',', $storage_types ) )
117 if( not $Torrus::Collector::storageTypes{$storage_type} )
119 Error('Unknown storage type: ' . $storage_type);
123 my $storage_string = $storage_type . '-storage';
124 if( not exists( $self->{'targets'}{$token}{'storage-types'} ) )
126 $self->{'targets'}{$token}{'storage-types'} = [];
128 push( @{$self->{'targets'}{$token}{'storage-types'}},
131 $self->fetchParams($config_tree, $token, $storage_string);
132 $self->{'storage_in_use'}{$storage_type} = 1;
136 # If specified, store the value transformation code
137 my $code = $config_tree->getNodeParam($token, 'transform-value');
140 $self->{'targets'}{$token}{'transform'} = $code;
143 # If specified, store the scale RPN
144 my $scalerpn = $config_tree->getNodeParam($token, 'collector-scale');
145 if( defined $scalerpn )
147 $self->{'targets'}{$token}{'scalerpn'} = $scalerpn;
150 # If specified, store the value map
151 my $valueMap = $config_tree->getNodeParam($token, 'value-map');
152 if( defined $valueMap and length($valueMap) > 0 )
155 foreach my $item ( split( ',', $valueMap ) )
157 my ($key, $value) = split( ':', $item );
158 $map->{$key} = $value;
160 $self->{'targets'}{$token}{'value-map'} = $map;
163 # Initialize local token, collectpor, and storage data
164 if( not defined $self->{'targets'}{$token}{'local'} )
166 $self->{'targets'}{$token}{'local'} = {};
169 if( ref( $Torrus::Collector::initTarget{$collector_type} ) )
171 $ok = &{$Torrus::Collector::initTarget{$collector_type}}($self,
177 foreach my $storage_type
178 ( @{$self->{'targets'}{$token}{'storage-types'}} )
180 my $storage_string = $storage_type . '-storage';
181 if( ref( $Torrus::Collector::initTarget{$storage_string} ) )
183 &{$Torrus::Collector::initTarget{$storage_string}}($self,
191 $self->deleteTarget( $token );
199 my $config_tree = shift;
203 if( not defined( $Torrus::Collector::params{$type} ) )
205 Error("\%Torrus::Collector::params does not have member $type");
209 my $ref = \$self->{'targets'}{$token}{'params'};
211 my @maps = ( $Torrus::Collector::params{$type} );
213 while( scalar( @maps ) > 0 )
215 &Torrus::DB::checkInterrupted();
218 foreach my $map ( @maps )
220 foreach my $param ( keys %{$map} )
222 my $value = $config_tree->getNodeParam( $token, $param );
224 if( ref( $map->{$param} ) )
228 if( exists $map->{$param}->{$value} )
230 if( defined $map->{$param}->{$value} )
233 $map->{$param}->{$value} );
238 Error("Parameter $param has unknown value: " .
239 $value . " in " . $self->path($token));
245 if( not defined $value )
247 # We know the default value
248 $value = $map->{$param};
251 # Finally store the value
254 $$ref->{$param} = $value;
266 my $config_tree = shift;
270 &Torrus::DB::checkInterrupted();
272 my $ref = \$self->{'targets'}{$token}{'params'};
274 foreach my $param ( @params )
276 my $value = $config_tree->getNodeParam( $token, $param );
279 $$ref->{$param} = $value;
291 return $self->{'targets'}{$token}{'params'}{$param};
301 $self->{'targets'}{$token}{'params'}{$param} = $value;
310 return $self->{'targets'}{$token}{'path'};
313 sub listCollectorTargets
316 my $collector_type = shift;
319 foreach my $token ( keys %{$self->{'targets'}} )
321 if( $self->{'targets'}{$token}{'type'} eq $collector_type )
323 push( @ret, $token );
329 # A callback procedure that will be executed on deleteTarget()
331 sub registerDeleteCallback
337 if( not ref( $self->{'targets'}{$token}{'deleteProc'} ) )
339 $self->{'targets'}{$token}{'deleteProc'} = [];
341 push( @{$self->{'targets'}{$token}{'deleteProc'}}, $proc );
349 &Torrus::DB::checkInterrupted();
351 Info('Deleting target: ' . $self->path($token));
353 if( ref( $self->{'targets'}{$token}{'deleteProc'} ) )
355 foreach my $proc ( @{$self->{'targets'}{$token}{'deleteProc'}} )
357 &{$proc}( $self, $token );
360 delete $self->{'targets'}{$token};
363 # Returns a reference to token-specific local data
370 return $self->{'targets'}{$token}{'local'};
373 # Returns a reference to collector type-specific local data
380 return $self->{'types'}{$type};
383 # Returns a reference to storage type-specific local data
390 return $self->{'storage'}{$type};
394 # Runs each collector type, and then stores the values
399 undef $self->{'values'};
401 while( my ($collector_type, $ref) = each %{$self->{'types'}} )
403 next unless $self->{'types_in_use'}{$collector_type};
405 &Torrus::DB::checkInterrupted();
407 if( $Torrus::Collector::needsConfigTree
408 {$collector_type}{'runCollector'} )
410 $self->{'config_tree'} =
411 new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
415 &{$Torrus::Collector::runCollector{$collector_type}}( $self, $ref );
417 if( defined( $self->{'config_tree'} ) )
419 undef $self->{'config_tree'};
423 while( my ($storage_type, $ref) = each %{$self->{'storage'}} )
425 next unless $self->{'storage_in_use'}{$storage_type};
427 &Torrus::DB::checkInterrupted();
429 if( $Torrus::Collector::needsConfigTree
430 {$storage_type}{'storeData'} )
432 $self->{'config_tree'} =
433 new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
437 &{$Torrus::Collector::storeData{$storage_type}}( $self, $ref );
439 if( defined( $self->{'config_tree'} ) )
441 undef $self->{'config_tree'};
445 while( my ($collector_type, $ref) = each %{$self->{'types'}} )
447 next unless $self->{'types_in_use'}{$collector_type};
449 if( ref( $Torrus::Collector::postProcess{$collector_type} ) )
451 &Torrus::DB::checkInterrupted();
453 if( $Torrus::Collector::needsConfigTree
454 {$collector_type}{'postProcess'} )
456 $self->{'config_tree'} =
457 new Torrus::ConfigTree( -TreeName => $self->{'tree_name'},
461 &{$Torrus::Collector::postProcess{$collector_type}}( $self, $ref );
463 if( defined( $self->{'config_tree'} ) )
465 undef $self->{'config_tree'};
472 # This procedure is called by the collector type-specific functions
473 # every time there's a new value for a token
479 my $timestamp = shift;
484 if( defined( my $code = $self->{'targets'}{$token}{'transform'} ) )
486 # Screen out the percent sign and $_
487 $code =~ s/DOLLAR/\$/gm;
488 $code =~ s/MOD/\%/gm;
489 Debug('Value before transformation: ' . $value);
491 $value = do { eval $code };
494 Error('Fatal error in transformation code: ' . $@ );
497 elsif( $value !~ /^[0-9.+-eE]+$/o and $value ne 'U' )
499 Error('Non-numeric value after transformation: ' . $value);
503 elsif( defined( my $map = $self->{'targets'}{$token}{'value-map'} ) )
506 if( defined( $map->{$value} ) )
508 $newValue = $map->{$value};
510 elsif( defined( $map->{'_'} ) )
512 $newValue = $map->{'_'};
516 Warn('Could not find value mapping for ' . $value .
517 'in ' . $self->path($token));
520 if( defined( $newValue ) )
522 Debug('Value mapping: ' . $value . ' -> ' . $newValue);
527 if( defined( $self->{'targets'}{$token}{'scalerpn'} ) )
529 Debug('Value before scaling: ' . $value);
530 my $rpn = new Torrus::RPN;
531 $value = $rpn->run( $value . ',' .
532 $self->{'targets'}{$token}{'scalerpn'},
539 Debug('Value ' . $value . ' set for ' .
540 $self->path($token) . ' TS=' . $timestamp);
543 foreach my $storage_type
544 ( @{$self->{'targets'}{$token}{'storage-types'}} )
546 &{$Torrus::Collector::setValue{$storage_type}}( $self, $token,
557 if( defined( $self->{'config_tree'} ) )
559 return $self->{'config_tree'};
563 Error('Cannot provide ConfigTree object');
569 ####### Collector scheduler ########
571 package Torrus::CollectorScheduler;
572 @Torrus::CollectorScheduler::ISA = qw(Torrus::Scheduler);
574 use Torrus::ConfigTree;
576 use Torrus::Scheduler;
577 use Torrus::TimeStamp;
584 &Torrus::DB::checkInterrupted();
586 my $tree = $self->treeName();
587 my $config_tree = new Torrus::ConfigTree(-TreeName => $tree, -Wait => 1);
588 if( not defined( $config_tree ) )
593 my $data = $self->data();
595 my $instance = $self->{'options'}{'-Instance'};
597 # Prepare the list of tokens, sorted by period and offset,
598 # from config tree or from cache.
600 my $need_new_tasks = 0;
602 Torrus::TimeStamp::init();
603 my $timestamp_key = $tree . ':' . $instance . ':collector_cache';
604 my $known_ts = Torrus::TimeStamp::get( $timestamp_key );
605 my $actual_ts = $config_tree->getTimestamp();
607 if( $actual_ts >= $known_ts or not $data->{'targets_initialized'} )
609 Info('Initializing tasks for collector instance ' . $instance);
610 Debug("Config TS: $actual_ts, Collector TS: $known_ts");
611 my $init_start = time();
616 new Torrus::DB('collector_tokens' . '_' . $instance . '_' .
617 $config_tree->{'ds_config_instance'},
620 my $cursor = $db_tokens->cursor();
621 while( my ($token, $schedule) = $db_tokens->next($cursor) )
623 my ($period, $offset) = split(/:/o, $schedule);
624 if( not exists( $targets->{$period}{$offset} ) )
626 $targets->{$period}{$offset} = [];
628 push( @{$targets->{$period}{$offset}}, $token );
630 &Torrus::DB::checkInterrupted();
633 $db_tokens->closeNow();
636 &Torrus::DB::checkInterrupted();
639 &Torrus::TimeStamp::setNow( $timestamp_key );
643 foreach my $period ( keys %{$targets} )
645 foreach my $offset ( keys %{$targets->{$period}} )
648 new Torrus::Collector( -Period => $period,
651 -Instance => $instance );
653 foreach my $token ( @{$targets->{$period}{$offset}} )
655 &Torrus::DB::checkInterrupted();
656 $collector->addTarget( $config_tree, $token );
659 $self->addTask( $collector );
662 Verbose(sprintf("Tasks initialization finished in %d seconds",
663 time() - $init_start));
665 $data->{'targets_initialized'} = 1;
666 Info('Tasks for collector instance ' . $instance . ' initialized');
668 foreach my $collector_type ( keys %Torrus::Collector::collectorTypes )
670 if( ref($Torrus::Collector::initCollectorGlobals{
673 &{$Torrus::Collector::initCollectorGlobals{
674 $collector_type}}($tree, $instance);
676 Verbose('Initialized collector globals for type: ' .
682 Torrus::TimeStamp::release();
693 # indent-tabs-mode: nil
694 # perl-indent-level: 4