From 4ee28b411fa819a420926b8a6e60be043fdb1d2e Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 17 Apr 2009 19:50:11 +0000 Subject: add priority to job queue so billing jobs don't don't drown out provisioning jobs --- FS/bin/freeside-queued | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'FS/bin/freeside-queued') diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index d4f09c18d..8c07638b3 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -84,9 +84,12 @@ while (1) { my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. ' WHERE queue_depend.jobnum = queue.jobnum )'; - my $order_by = "ORDER BY jobnum ". ( driver_name eq 'mysql' - ? 'LIMIT 1 FOR UPDATE' - : 'FOR UPDATE LIMIT 1' ); + #anything with a priority goes after stuff without one + my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + + $order_by .= ( driver_name eq 'mysql' + ? ' LIMIT 1 FOR UPDATE ' + : ' FOR UPDATE LIMIT 1 ' ); my $job = qsearchs({ 'table' => 'queue', -- cgit v1.2.1 From a80720cc443697b1be73e0570fa40d592ccbe8bd Mon Sep 17 00:00:00 2001 From: ivan Date: Sun, 26 Apr 2009 23:19:13 +0000 Subject: start small jobs more efficiently, RT#4412 --- FS/bin/freeside-queued | 164 +++++++++++++++++++++++++------------------------ 1 file changed, 84 insertions(+), 80 deletions(-) (limited to 'FS/bin/freeside-queued') diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 8c07638b3..2188dd404 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -6,7 +6,8 @@ use POSIX qw(:sys_wait_h); use IO::File; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); -use FS::Record qw(qsearch qsearchs); +use FS::Conf; +use FS::Record qw(qsearch); use FS::queue; use FS::queue_depend; @@ -15,7 +16,6 @@ use Net::SSH 0.07; $DEBUG = 0; -$max_kids = '10'; #guess it should be a config file... $kids = 0; my $user = shift or die &usage; @@ -27,7 +27,6 @@ daemonize1('freeside-queued'); warn "dropping privledges\n" if $DEBUG; drop_root(); - $ENV{HOME} = (getpwuid($>))[7]; #for ssh warn "connecting to database\n" if $DEBUG; @@ -48,6 +47,9 @@ daemonize2(); #-- +my $conf = new FS::Conf; +$max_kids = $conf->config('queued-max_kids') || 10; + my $warnkids=0; while (1) { @@ -81,117 +83,119 @@ while (1) { # local $FS::UID::AutoCommit = 0; $FS::UID::AutoCommit = 0; - my $nodepend = 'AND 0 = ( SELECT COUNT(*) FROM queue_depend'. + my $nodepend = 'AND NOT EXISTS( SELECT 1 FROM queue_depend'. ' WHERE queue_depend.jobnum = queue.jobnum )'; #anything with a priority goes after stuff without one my $order_by = ' ORDER BY COALESCE(priority,0) ASC, jobnum ASC '; + my $limit = $max_kids - $kids; + $order_by .= ( driver_name eq 'mysql' - ? ' LIMIT 1 FOR UPDATE ' - : ' FOR UPDATE LIMIT 1 ' ); + ? " LIMIT $limit FOR UPDATE " + : " FOR UPDATE LIMIT $limit " ); - my $job = qsearchs({ + my @jobs = qsearch({ 'table' => 'queue', 'hashref' => { 'status' => 'new' }, 'extra_sql' => $nodepend, 'order_by' => $order_by, - }) or do { - # if $oldAutoCommit { + }); + + unless ( @jobs ) { dbh->commit or do { warn "WARNING: database error, closing connection: ". dbh->errstr; undef $FS::UID::dbh; next; }; - # } sleep 1; next; - }; - - my %hash = $job->hash; - $hash{'status'} = 'locked'; - my $ljob = new FS::queue ( \%hash ); - my $error = $ljob->replace($job); - if ( $error ) { - warn "WARNING: database error locking job, closing connection: ". - dbh->errstr; - undef $FS::UID::dbh; - next; } - # if $oldAutoCommit { - dbh->commit or do { - warn "WARNING: database error, closing connection: ". dbh->errstr; - undef $FS::UID::dbh; - next; - }; - # } - - $FS::UID::AutoCommit = 1; - #} - - my @args = $ljob->args; - splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + foreach my $job ( @jobs ) { - defined( my $pid = fork ) or do { - warn "WARNING: can't fork: $!\n"; my %hash = $job->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + $hash{'status'} = 'locked'; my $ljob = new FS::queue ( \%hash ); my $error = $ljob->replace($job); - die $error if $error; - next; #don't increment the kid counter - }; - - if ( $pid ) { - $kids++; - $kids{$pid} = 1; - } else { #kid time - - #get new db handle - $FS::UID::dbh->{InactiveDestroy} = 1; - - forksuidsetup($user); - - #auto-use classes... - #if ( $ljob->job =~ /(FS::part_export::\w+)::/ ) { - if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ - || $ljob->job =~ /(FS::\w+)::/ - ) - { - my $class = $1; - eval "use $class;"; + if ( $error ) { + warn "WARNING: database error locking job, closing connection: ". + dbh->errstr; + undef $FS::UID::dbh; + next; + } + + dbh->commit or do { + warn "WARNING: database error, closing connection: ". dbh->errstr; + undef $FS::UID::dbh; + next; + }; + + $FS::UID::AutoCommit = 1; + + my @args = $ljob->args; + splice @args, 0, 1, $ljob if $args[0] eq '_JOB'; + + defined( my $pid = fork ) or do { + warn "WARNING: can't fork: $!\n"; + my %hash = $job->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = "[freeside-queued] can't fork: $!"; + my $ljob = new FS::queue ( \%hash ); + my $error = $ljob->replace($job); + die $error if $error; + next; #don't increment the kid counter + }; + + if ( $pid ) { + $kids++; + $kids{$pid} = 1; + } else { #kid time + + #get new db handle + $FS::UID::dbh->{InactiveDestroy} = 1; + + forksuidsetup($user); + + #auto-use classes... + if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ + || $ljob->job =~ /(FS::\w+)::/ + ) + { + my $class = $1; + eval "use $class;"; + if ( $@ ) { + warn "job use $class failed"; + my %hash = $ljob->hash; + $hash{'status'} = 'failed'; + $hash{'statustext'} = $@; + my $fjob = new FS::queue( \%hash ); + my $error = $fjob->replace($ljob); + die $error if $error; + exit; #end-of-kid + }; + } + + my $eval = "&". $ljob->job. '(@args);'; + warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; + eval $eval; #throw away return value? suppose so if ( $@ ) { - warn "job use $class failed"; + warn "job $eval failed"; my %hash = $ljob->hash; $hash{'status'} = 'failed'; $hash{'statustext'} = $@; my $fjob = new FS::queue( \%hash ); my $error = $fjob->replace($ljob); die $error if $error; - exit; #end-of-kid - }; - } + } else { + $ljob->delete; + } - my $eval = "&". $ljob->job. '(@args);'; - warn 'running "&'. $ljob->job. '('. join(', ', @args). ")\n" if $DEBUG; - eval $eval; #throw away return value? suppose so - if ( $@ ) { - warn "job $eval failed"; - my %hash = $ljob->hash; - $hash{'status'} = 'failed'; - $hash{'statustext'} = $@; - my $fjob = new FS::queue( \%hash ); - my $error = $fjob->replace($ljob); - die $error if $error; - } else { - $ljob->delete; + exit; + #end-of-kid } - exit; - #end-of-kid - } + } #foreach my $job } continue { if ( sigterm() ) { -- cgit v1.2.1 From 70d0d44dff39cb9235cbbec1918c4ea95f0dc4c4 Mon Sep 17 00:00:00 2001 From: ivan Date: Wed, 24 Jun 2009 18:36:32 +0000 Subject: add support for db profiling, RT#5662 --- FS/bin/freeside-queued | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'FS/bin/freeside-queued') diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 2188dd404..22fd7bb5e 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -157,6 +157,8 @@ while (1) { forksuidsetup($user); + dbh->{'private_profile'} = {} if UNIVERSAL::can(dbh, 'sprintProfile'); + #auto-use classes... if ( $ljob->job =~ /(FS::(part_export|cust_main)::\w+)::/ || $ljob->job =~ /(FS::\w+)::/ @@ -191,6 +193,13 @@ while (1) { $ljob->delete; } + if ( UNIVERSAL::can(dbh, 'sprintProfile') ) { + open(PROFILE,">%%%FREESIDE_LOG%%%/queueprofile.$$.".time) + or die "can't open profile file: $!"; + print PROFILE dbh->sprintProfile(); + close PROFILE or die "can't close profile file: $!"; + } + exit; #end-of-kid } -- cgit v1.2.1 From 60c6c6b6f471606908a0f862cef8eab15d42972b Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 26 Jun 2009 23:12:45 +0000 Subject: add -s and -n flags to freeside-daily to specify the kinds of jobs to be run, RT#5572 --- FS/bin/freeside-queued | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) (limited to 'FS/bin/freeside-queued') diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 22fd7bb5e..008616fb0 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -4,6 +4,7 @@ use strict; use vars qw( $DEBUG $kids $max_kids %kids ); use POSIX qw(:sys_wait_h); use IO::File; +use Getopt::Std; use FS::UID qw(adminsuidsetup forksuidsetup driver_name dbh myconnect); use FS::Daemon qw(daemonize1 drop_root logfile daemonize2 sigint sigterm); use FS::Conf; @@ -18,6 +19,10 @@ $DEBUG = 0; $kids = 0; +&untaint_argv; #what it sounds like (eww) +use vars qw(%opt); +getopts('sn', \%opt ); + my $user = shift or die &usage; warn "starting daemonization (forking)\n" if $DEBUG; @@ -95,9 +100,16 @@ while (1) { ? " LIMIT $limit FOR UPDATE " : " FOR UPDATE LIMIT $limit " ); + my $hashref = { 'status' => 'new' }; + if ( $opt{'s'} ) { + $hashref->{'status'} = 'Y'; + } elsif ( $opt{'n'} ) { + $hashref->{'status'} = ''; + } + my @jobs = qsearch({ 'table' => 'queue', - 'hashref' => { 'status' => 'new' }, + 'hashref' => $hashref, 'extra_sql' => $nodepend, 'order_by' => $order_by, }); @@ -217,6 +229,15 @@ while (1) { } } +sub untaint_argv { + foreach $_ ( $[ .. $#ARGV ) { #untaint @ARGV + #$ARGV[$_] =~ /^([\w\-\/]*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + # Date::Parse + $ARGV[$_] =~ /^(.*)$/ || die "Illegal arguement \"$ARGV[$_]\""; + $ARGV[$_]=$1; + } +} + sub usage { die "Usage:\n\n freeside-queued user\n"; } @@ -237,12 +258,16 @@ freeside-queued - Job queue daemon =head1 SYNOPSIS - freeside-queued user + freeside-queued [ -s | -n ] user =head1 DESCRIPTION Job queue daemon. Should be running at all times. +-s: "secure" jobs only (queued billing jobs) + +-n: non-"secure" jobs only (other jobs) + user: from the mapsecrets file - see config.html from the base documentation =head1 VERSION -- cgit v1.2.1 From 3cd902f4cad7410785e4640fce5d1fc45fb10c5e Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 26 Jun 2009 23:21:45 +0000 Subject: doh, brainfart, RT#5572 --- FS/bin/freeside-queued | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'FS/bin/freeside-queued') diff --git a/FS/bin/freeside-queued b/FS/bin/freeside-queued index 008616fb0..e97a52cab 100644 --- a/FS/bin/freeside-queued +++ b/FS/bin/freeside-queued @@ -102,9 +102,9 @@ while (1) { my $hashref = { 'status' => 'new' }; if ( $opt{'s'} ) { - $hashref->{'status'} = 'Y'; + $hashref->{'secure'} = 'Y'; } elsif ( $opt{'n'} ) { - $hashref->{'status'} = ''; + $hashref->{'secure'} = ''; } my @jobs = qsearch({ -- cgit v1.2.1