diff options
Diffstat (limited to 'torrus/perllib/Torrus/Collector.pm')
| -rw-r--r-- | torrus/perllib/Torrus/Collector.pm | 695 |
1 files changed, 695 insertions, 0 deletions
diff --git a/torrus/perllib/Torrus/Collector.pm b/torrus/perllib/Torrus/Collector.pm new file mode 100644 index 000000000..0789be05f --- /dev/null +++ b/torrus/perllib/Torrus/Collector.pm @@ -0,0 +1,695 @@ +# Copyright (C) 2002 Stanislav Sinyagin +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +# $Id: Collector.pm,v 1.1 2010-12-27 00:03:38 ivan Exp $ +# Stanislav Sinyagin <ssinyagin@yahoo.com> + + +package Torrus::Collector; +@Torrus::Collector::ISA = qw(Torrus::Scheduler::PeriodicTask); + +use strict; +use Torrus::ConfigTree; +use Torrus::Log; +use Torrus::RPN; +use Torrus::Scheduler; + +BEGIN +{ + foreach my $mod ( @Torrus::Collector::loadModules ) + { + eval( 'require ' . $mod ); + die( $@ ) if $@; + } +} + +# Executed once after the fork. Here modules can launch processing threads +sub initThreads +{ + foreach my $key ( %Torrus::Collector::initThreadsHandlers ) + { + if( ref( $Torrus::Collector::initThreadsHandlers{$key} ) ) + { + &{$Torrus::Collector::initThreadsHandlers{$key}}(); + } + } +} + + +## One collector module instance holds all leaf tokens which +## must be collected at the same time. + +sub new +{ + my $proto = shift; + my %options = @_; + + if( not $options{'-Name'} ) + { + $options{'-Name'} = "Collector"; + } + + my $class = ref($proto) || $proto; + my $self = $class->SUPER::new( %options ); + bless $self, $class; + + foreach my $collector_type ( keys %Torrus::Collector::collectorTypes ) + { + $self->{'types'}{$collector_type} = {}; + $self->{'types_in_use'}{$collector_type} = 0; + } + + foreach my $storage_type ( keys %Torrus::Collector::storageTypes ) + { + $self->{'storage'}{$storage_type} = {}; + $self->{'storage_in_use'}{$storage_type} = 0; + + my $storage_string = $storage_type . '-storage'; + if( ref( $Torrus::Collector::initStorage{$storage_string} ) ) + { + &{$Torrus::Collector::initStorage{$storage_string}}($self); + } + } + + $self->{'tree_name'} = $options{'-TreeName'}; + + return $self; +} + + +sub addTarget +{ + my $self = shift; + my $config_tree = shift; + my $token = shift; + + my $ok = 1; + $self->{'targets'}{$token}{'path'} = $config_tree->path($token); + + my $collector_type = $config_tree->getNodeParam($token, 'collector-type'); + if( not $Torrus::Collector::collectorTypes{$collector_type} ) + { + Error('Unknown collector type: ' . $collector_type); + return; + } + + $self->fetchParams($config_tree, $token, $collector_type); + + $self->{'targets'}{$token}{'type'} = $collector_type; + $self->{'types_in_use'}{$collector_type} = 1; + + my $storage_types = $config_tree->getNodeParam($token, 'storage-type'); + foreach my $storage_type ( split( ',', $storage_types ) ) + { + if( not $Torrus::Collector::storageTypes{$storage_type} ) + { + Error('Unknown storage type: ' . $storage_type); + } + else + { + my $storage_string = $storage_type . '-storage'; + if( not exists( $self->{'targets'}{$token}{'storage-types'} ) ) + { + $self->{'targets'}{$token}{'storage-types'} = []; + } + push( @{$self->{'targets'}{$token}{'storage-types'}}, + $storage_type ); + + $self->fetchParams($config_tree, $token, $storage_string); + $self->{'storage_in_use'}{$storage_type} = 1; + } + } + + # If specified, store the value transformation code + my $code = $config_tree->getNodeParam($token, 'transform-value'); + if( defined $code ) + { + $self->{'targets'}{$token}{'transform'} = $code; + } + + # If specified, store the scale RPN + my $scalerpn = $config_tree->getNodeParam($token, 'collector-scale'); + if( defined $scalerpn ) + { + $self->{'targets'}{$token}{'scalerpn'} = $scalerpn; + } + + # If specified, store the value map + my $valueMap = $config_tree->getNodeParam($token, 'value-map'); + if( defined $valueMap and length($valueMap) > 0 ) + { + my $map = {}; + foreach my $item ( split( ',', $valueMap ) ) + { + my ($key, $value) = split( ':', $item ); + $map->{$key} = $value; + } + $self->{'targets'}{$token}{'value-map'} = $map; + } + + # Initialize local token, collectpor, and storage data + if( not defined $self->{'targets'}{$token}{'local'} ) + { + $self->{'targets'}{$token}{'local'} = {}; + } + + if( ref( $Torrus::Collector::initTarget{$collector_type} ) ) + { + $ok = &{$Torrus::Collector::initTarget{$collector_type}}($self, + $token); + } + + if( $ok ) + { + foreach my $storage_type + ( @{$self->{'targets'}{$token}{'storage-types'}} ) + { + my $storage_string = $storage_type . '-storage'; + if( ref( $Torrus::Collector::initTarget{$storage_string} ) ) + { + &{$Torrus::Collector::initTarget{$storage_string}}($self, + $token); + } + } + } + + if( not $ok ) + { + $self->deleteTarget( $token ); + } +} + + +sub fetchParams +{ + my $self = shift; + my $config_tree = shift; + my $token = shift; + my $type = shift; + + if( not defined( $Torrus::Collector::params{$type} ) ) + { + Error("\%Torrus::Collector::params does not have member $type"); + return; + } + + my $ref = \$self->{'targets'}{$token}{'params'}; + + my @maps = ( $Torrus::Collector::params{$type} ); + + while( scalar( @maps ) > 0 ) + { + &Torrus::DB::checkInterrupted(); + + my @next_maps = (); + foreach my $map ( @maps ) + { + foreach my $param ( keys %{$map} ) + { + my $value = $config_tree->getNodeParam( $token, $param ); + + if( ref( $map->{$param} ) ) + { + if( defined $value ) + { + if( exists $map->{$param}->{$value} ) + { + if( defined $map->{$param}->{$value} ) + { + push( @next_maps, + $map->{$param}->{$value} ); + } + } + else + { + Error("Parameter $param has unknown value: " . + $value . " in " . $self->path($token)); + } + } + } + else + { + if( not defined $value ) + { + # We know the default value + $value = $map->{$param}; + } + } + # Finally store the value + if( defined $value ) + { + $$ref->{$param} = $value; + } + } + } + @maps = @next_maps; + } +} + + +sub fetchMoreParams +{ + my $self = shift; + my $config_tree = shift; + my $token = shift; + my @params = @_; + + &Torrus::DB::checkInterrupted(); + + my $ref = \$self->{'targets'}{$token}{'params'}; + + foreach my $param ( @params ) + { + my $value = $config_tree->getNodeParam( $token, $param ); + if( defined $value ) + { + $$ref->{$param} = $value; + } + } +} + + +sub param +{ + my $self = shift; + my $token = shift; + my $param = shift; + + return $self->{'targets'}{$token}{'params'}{$param}; +} + +sub setParam +{ + my $self = shift; + my $token = shift; + my $param = shift; + my $value = shift; + + $self->{'targets'}{$token}{'params'}{$param} = $value; +} + + +sub path +{ + my $self = shift; + my $token = shift; + + return $self->{'targets'}{$token}{'path'}; +} + +sub listCollectorTargets +{ + my $self = shift; + my $collector_type = shift; + + my @ret; + foreach my $token ( keys %{$self->{'targets'}} ) + { + if( $self->{'targets'}{$token}{'type'} eq $collector_type ) + { + push( @ret, $token ); + } + } + return @ret; +} + +# A callback procedure that will be executed on deleteTarget() + +sub registerDeleteCallback +{ + my $self = shift; + my $token = shift; + my $proc = shift; + + if( not ref( $self->{'targets'}{$token}{'deleteProc'} ) ) + { + $self->{'targets'}{$token}{'deleteProc'} = []; + } + push( @{$self->{'targets'}{$token}{'deleteProc'}}, $proc ); +} + +sub deleteTarget +{ + my $self = shift; + my $token = shift; + + &Torrus::DB::checkInterrupted(); + + Info('Deleting target: ' . $self->path($token)); + + if( ref( $self->{'targets'}{$token}{'deleteProc'} ) ) + { + foreach my $proc ( @{$self->{'targets'}{$token}{'deleteProc'}} ) + { + &{$proc}( $self, $token ); + } + } + delete $self->{'targets'}{$token}; +} + +# Returns a reference to token-specific local data + +sub tokenData +{ + my $self = shift; + my $token = shift; + + return $self->{'targets'}{$token}{'local'}; +} + +# Returns a reference to collector type-specific local data + +sub collectorData +{ + my $self = shift; + my $type = shift; + + return $self->{'types'}{$type}; +} + +# Returns a reference to storage type-specific local data + +sub storageData +{ + my $self = shift; + my $type = shift; + + return $self->{'storage'}{$type}; +} + + +# Runs each collector type, and then stores the values +sub run +{ + my $self = shift; + + undef $self->{'values'}; + + while( my ($collector_type, $ref) = each %{$self->{'types'}} ) + { + next unless $self->{'types_in_use'}{$collector_type}; + + &Torrus::DB::checkInterrupted(); + + if( $Torrus::Collector::needsConfigTree + {$collector_type}{'runCollector'} ) + { + $self->{'config_tree'} = + new Torrus::ConfigTree( -TreeName => $self->{'tree_name'}, + -Wait => 1 ); + } + + &{$Torrus::Collector::runCollector{$collector_type}}( $self, $ref ); + + if( defined( $self->{'config_tree'} ) ) + { + undef $self->{'config_tree'}; + } + } + + while( my ($storage_type, $ref) = each %{$self->{'storage'}} ) + { + next unless $self->{'storage_in_use'}{$storage_type}; + + &Torrus::DB::checkInterrupted(); + + if( $Torrus::Collector::needsConfigTree + {$storage_type}{'storeData'} ) + { + $self->{'config_tree'} = + new Torrus::ConfigTree( -TreeName => $self->{'tree_name'}, + -Wait => 1 ); + } + + &{$Torrus::Collector::storeData{$storage_type}}( $self, $ref ); + + if( defined( $self->{'config_tree'} ) ) + { + undef $self->{'config_tree'}; + } + } + + while( my ($collector_type, $ref) = each %{$self->{'types'}} ) + { + next unless $self->{'types_in_use'}{$collector_type}; + + if( ref( $Torrus::Collector::postProcess{$collector_type} ) ) + { + &Torrus::DB::checkInterrupted(); + + if( $Torrus::Collector::needsConfigTree + {$collector_type}{'postProcess'} ) + { + $self->{'config_tree'} = + new Torrus::ConfigTree( -TreeName => $self->{'tree_name'}, + -Wait => 1 ); + } + + &{$Torrus::Collector::postProcess{$collector_type}}( $self, $ref ); + + if( defined( $self->{'config_tree'} ) ) + { + undef $self->{'config_tree'}; + } + } + } +} + + +# This procedure is called by the collector type-specific functions +# every time there's a new value for a token +sub setValue +{ + my $self = shift; + my $token = shift; + my $value = shift; + my $timestamp = shift; + my $uptime = shift; + + if( $value ne 'U' ) + { + if( defined( my $code = $self->{'targets'}{$token}{'transform'} ) ) + { + # Screen out the percent sign and $_ + $code =~ s/DOLLAR/\$/gm; + $code =~ s/MOD/\%/gm; + Debug('Value before transformation: ' . $value); + $_ = $value; + $value = do { eval $code }; + if( $@ ) + { + Error('Fatal error in transformation code: ' . $@ ); + $value = 'U'; + } + elsif( $value !~ /^[0-9.+-eE]+$/o and $value ne 'U' ) + { + Error('Non-numeric value after transformation: ' . $value); + $value = 'U'; + } + } + elsif( defined( my $map = $self->{'targets'}{$token}{'value-map'} ) ) + { + my $newValue; + if( defined( $map->{$value} ) ) + { + $newValue = $map->{$value}; + } + elsif( defined( $map->{'_'} ) ) + { + $newValue = $map->{'_'}; + } + else + { + Warn('Could not find value mapping for ' . $value . + 'in ' . $self->path($token)); + } + + if( defined( $newValue ) ) + { + Debug('Value mapping: ' . $value . ' -> ' . $newValue); + $value = $newValue; + } + } + + if( defined( $self->{'targets'}{$token}{'scalerpn'} ) ) + { + Debug('Value before scaling: ' . $value); + my $rpn = new Torrus::RPN; + $value = $rpn->run( $value . ',' . + $self->{'targets'}{$token}{'scalerpn'}, + sub{} ); + } + } + + if( isDebug() ) + { + Debug('Value ' . $value . ' set for ' . + $self->path($token) . ' TS=' . $timestamp); + } + + foreach my $storage_type + ( @{$self->{'targets'}{$token}{'storage-types'}} ) + { + &{$Torrus::Collector::setValue{$storage_type}}( $self, $token, + $value, $timestamp, + $uptime ); + } +} + + +sub configTree +{ + my $self = shift; + + if( defined( $self->{'config_tree'} ) ) + { + return $self->{'config_tree'}; + } + else + { + Error('Cannot provide ConfigTree object'); + return undef; + } +} + + +####### Collector scheduler ######## + +package Torrus::CollectorScheduler; +@Torrus::CollectorScheduler::ISA = qw(Torrus::Scheduler); + +use Torrus::ConfigTree; +use Torrus::Log; +use Torrus::Scheduler; +use Torrus::TimeStamp; + + +sub beforeRun +{ + my $self = shift; + + &Torrus::DB::checkInterrupted(); + + my $tree = $self->treeName(); + my $config_tree = new Torrus::ConfigTree(-TreeName => $tree, -Wait => 1); + if( not defined( $config_tree ) ) + { + return undef; + } + + my $data = $self->data(); + + my $instance = $self->{'options'}{'-Instance'}; + + # Prepare the list of tokens, sorted by period and offset, + # from config tree or from cache. + + my $need_new_tasks = 0; + + Torrus::TimeStamp::init(); + my $timestamp_key = $tree . ':' . $instance . ':collector_cache'; + my $known_ts = Torrus::TimeStamp::get( $timestamp_key ); + my $actual_ts = $config_tree->getTimestamp(); + + if( $actual_ts >= $known_ts or not $data->{'targets_initialized'} ) + { + Info('Initializing tasks for collector instance ' . $instance); + Debug("Config TS: $actual_ts, Collector TS: $known_ts"); + my $init_start = time(); + + my $targets = {}; + + my $db_tokens = + new Torrus::DB('collector_tokens' . '_' . $instance . '_' . + $config_tree->{'ds_config_instance'}, + -Subdir => $tree); + + my $cursor = $db_tokens->cursor(); + while( my ($token, $schedule) = $db_tokens->next($cursor) ) + { + my ($period, $offset) = split(/:/o, $schedule); + if( not exists( $targets->{$period}{$offset} ) ) + { + $targets->{$period}{$offset} = []; + } + push( @{$targets->{$period}{$offset}}, $token ); + + &Torrus::DB::checkInterrupted(); + } + undef $cursor; + $db_tokens->closeNow(); + undef $db_tokens; + + &Torrus::DB::checkInterrupted(); + + # Set the timestamp + &Torrus::TimeStamp::setNow( $timestamp_key ); + + $self->flushTasks(); + + foreach my $period ( keys %{$targets} ) + { + foreach my $offset ( keys %{$targets->{$period}} ) + { + my $collector = + new Torrus::Collector( -Period => $period, + -Offset => $offset, + -TreeName => $tree, + -Instance => $instance ); + + foreach my $token ( @{$targets->{$period}{$offset}} ) + { + &Torrus::DB::checkInterrupted(); + $collector->addTarget( $config_tree, $token ); + } + + $self->addTask( $collector ); + } + } + Verbose(sprintf("Tasks initialization finished in %d seconds", + time() - $init_start)); + + $data->{'targets_initialized'} = 1; + Info('Tasks for collector instance ' . $instance . ' initialized'); + + foreach my $collector_type ( keys %Torrus::Collector::collectorTypes ) + { + if( ref($Torrus::Collector::initCollectorGlobals{ + $collector_type}) ) + { + &{$Torrus::Collector::initCollectorGlobals{ + $collector_type}}($tree, $instance); + + Verbose('Initialized collector globals for type: ' . + $collector_type); + } + } + } + + Torrus::TimeStamp::release(); + + return 1; +} + + +1; + + +# Local Variables: +# mode: perl +# indent-tabs-mode: nil +# perl-indent-level: 4 +# End: |
