non-blocking upgrade for part_pkg, #29155
authorMark Wells <mark@freeside.biz>
Thu, 15 May 2014 01:36:30 +0000 (18:36 -0700)
committerMark Wells <mark@freeside.biz>
Thu, 15 May 2014 01:36:30 +0000 (18:36 -0700)
FS/FS/Upgrade.pm
FS/FS/part_pkg.pm

index bf3e2ad..6785a13 100644 (file)
@@ -8,6 +8,7 @@ use File::Slurp;
 use FS::UID qw( dbh driver_name );
 use FS::Conf;
 use FS::Record qw(qsearchs qsearch str2time_sql);
+use FS::queue;
 use FS::upgrade_journal;
 
 use FS::svc_domain;
@@ -153,6 +154,26 @@ sub upgrade {
 
       $class->_upgrade_data(%opt);
 
+      # New interface for async upgrades: a class can declare a 
+      # "queueable_upgrade" method, which will run as part of the normal 
+      # upgrade, but if the -j option is passed, will instead be run from 
+      # the job queue.
+      if ( $class->can('queueable_upgrade') ) {
+        my $jobname = $class . '::queueable_upgrade';
+        my $num_jobs = FS::queue->count("job = '$jobname' and status != 'failed'");
+        if ($num_jobs > 0) {
+          warn "$class upgrade already scheduled.\n";
+        } else {
+          if ( $opt{'queue'} ) {
+            warn "Scheduling $class upgrade.\n";
+            my $job = FS::queue->new({ job => $jobname });
+            $job->insert($class, %opt);
+          } else {
+            $class->queueable_upgrade(%opt);
+          }
+        } #$num_jobs == 0
+      }
+
       if ( $oldAutoCommit ) {
         warn "  committing\n";
         dbh->commit or die dbh->errstr;
index b62b52d..0165455 100644 (file)
@@ -10,6 +10,7 @@ use Time::Local qw( timelocal timelocal_nocheck ); # eventually replace with Dat
 use Tie::IxHash;
 use FS::Conf;
 use FS::Record qw( qsearch qsearchs dbh dbdef );
+use FS::Cursor; # for upgrade
 use FS::pkg_svc;
 use FS::part_svc;
 use FS::cust_pkg;
@@ -1682,16 +1683,20 @@ sub _upgrade_data { # class method
     $part_pkg->replace;
 
   }
+  # the rest can be done asynchronously
+}
 
+sub queueable_upgrade {
   # now upgrade to the explicit custom flag
 
-  @part_pkg = qsearch({
+  my $search = FS::Cursor->new({
     'table'     => 'part_pkg',
     'hashref'   => { disabled => 'Y', custom => '' },
     'extra_sql' => "AND comment LIKE '(CUSTOM) %'",
   });
+  my $dbh = dbh;
 
-  foreach my $part_pkg (@part_pkg) {
+  while (my $part_pkg = $search->fetch) {
     my $new = new FS::part_pkg { $part_pkg->hash };
     $new->custom('Y');
     my $comment = $part_pkg->comment;
@@ -1708,15 +1713,25 @@ sub _upgrade_data { # class method
                                'primary_svc' => $primary,
                                'options'     => $options,
                              );
-    die $error if $error;
+    if ($error) {
+      warn "pkgpart#".$part_pkg->pkgpart.": $error\n";
+      $dbh->rollback;
+    } else {
+      $dbh->commit;
+    }
   }
 
   # set family_pkgpart on any packages that don't have it
-  @part_pkg = qsearch('part_pkg', { 'family_pkgpart' => '' });
-  foreach my $part_pkg (@part_pkg) {
+  $search = FS::Cursor->new('part_pkg', { 'family_pkgpart' => '' });
+  while (my $part_pkg = $search->fetch) {
     $part_pkg->set('family_pkgpart' => $part_pkg->pkgpart);
     my $error = $part_pkg->SUPER::replace;
-    die $error if $error;
+    if ($error) {
+      warn "pkgpart#".$part_pkg->pkgpart.": $error\n";
+      $dbh->rollback;
+    } else {
+      $dbh->commit;
+    }
   }
 
   my @part_pkg_option = qsearch('part_pkg_option',
@@ -1827,7 +1842,7 @@ sub _upgrade_data { # class method
       }
     } # $bad_upgrade exists
     else { # do the original upgrade, but correctly this time
-      @part_pkg = qsearch('part_pkg', {
+      my @part_pkg = qsearch('part_pkg', {
           fcc_ds0s        => { op => '>', value => 0 },
           fcc_voip_class  => ''
       });