diff options
| author | Mark Wells <mark@freeside.biz> | 2014-05-22 22:21:10 -0700 | 
|---|---|---|
| committer | Mark Wells <mark@freeside.biz> | 2014-05-22 22:21:10 -0700 | 
| commit | 38c28f5486c5fb02d1d6edfebf878b10a2a9239e (patch) | |
| tree | 7eaad0943ca24c02821fcc317eb0d0bafd258274 /FS/bin/freeside-upgrade | |
| parent | 1b4b00291c912647c6225f62c2e2be05b9be1f8f (diff) | |
fix bad race conditions in parallel schema upgrade, #29163
Diffstat (limited to 'FS/bin/freeside-upgrade')
| -rwxr-xr-x | FS/bin/freeside-upgrade | 127 | 
1 files changed, 92 insertions, 35 deletions
| diff --git a/FS/bin/freeside-upgrade b/FS/bin/freeside-upgrade index 6c9c24068..c3d070eec 100755 --- a/FS/bin/freeside-upgrade +++ b/FS/bin/freeside-upgrade @@ -182,54 +182,111 @@ if ( $opt_c ) {  } +my $MAX_HANDLES; # undef for now, set it if you want a limit +  if ( $DRY_RUN ) {    print      join(";\n", @statements ). ";\n";    exit; -} else { - -  my @clones = (); -  foreach my $statement ( @statements ) { - -    if ( $opt_a ) { - -      my $clone = ''; -      until ( $clone = $dbh->clone ) { -        sleep 60; #too many database connections?  wait and retry -      } -      until ( $clone->do( $statement, {pg_async=>PG_ASYNC} ) ) { -        sleep 60; #too many ... running queries?  wait and retry -      } -      warn "$statement\n"; -      push @clones, $clone; - +} elsif ( $opt_a ) { + +  my @phases = map { [] } 0..4; +  my $fsupgrade_idx = 1; +  my %idx_map; +  foreach (@statements) { +    if ( /^ *(CREATE|ALTER) +TABLE/ ) { +      # phase 0: CREATE TABLE, ALTER TABLE +      push @{ $phases[0] }, $_; +    } elsif ( /^ *ALTER +INDEX.* RENAME TO dbs_temp(\d+)/ ) { +      # phase 1: rename index to dbs_temp%d +      # (see DBIx::DBSchema::Table) +      # but in this case, uniqueify all the dbs_temps.  This method only works +      # because they are in the right order to begin with... +      my $dbstemp_idx = $1; +      s/dbs_temp$dbstemp_idx/fsupgrade_temp$fsupgrade_idx/; +      $idx_map{ $dbstemp_idx } = $fsupgrade_idx; +      push @{ $phases[1] }, $_; +      $fsupgrade_idx++; +    } elsif ( /^ *(CREATE|DROP)( +UNIQUE)? +INDEX/ ) { +      # phase 2: create/drop indices +      push @{ $phases[2] }, $_; +    } elsif ( /^ *ALTER +INDEX +dbs_temp(\d+) +RENAME/ ) { +      # phase 3: rename temp indices back to real ones +      my $dbstemp_idx = $1; +      my $mapped_idx = $idx_map{ $dbstemp_idx } +        or die "unable to remap dbs_temp$1 RENAME statement"; +      s/dbs_temp$dbstemp_idx/fsupgrade_temp$mapped_idx/; +      push @{ $phases[3] }, $_;      } else { -      warn "$statement\n"; -      $dbh->do( $statement ) -        or die "Error: ". $dbh->errstr. "\n executing: $statement"; +      # phase 4: everything else (CREATE SEQUENCE, SELECT SETVAL, etc.) +      push @{ $phases[4] }, $_;      } -    } - -  warn "Waiting for all schema changes to complete\n" if @clones; # && $DEBUG; -  while ( @clones ) { -    my @newclones = (); -    foreach my $clone ( @clones ) { -      if ( $clone->pg_ready ) { -        $clone->pg_result or die $clone->errstr; -        $clone->commit    or die $clone->errstr; -      } else { -        push @newclones, $clone; +  my $i = 0; +  my @busy = (); +  my @free = (); +  foreach my $phase (@phases) { +    warn "Starting schema changes, phase $i...\n"; +    while (@$phase or @busy) { +      # check status of all running tasks +      my @newbusy; +      my $failed_clone; +      for my $clone (@busy) { +        if ( $clone->pg_ready ) { +          # then clean it up +          my $rv = $clone->pg_result && $clone->commit; +          $failed_clone = $clone if !$rv; +          push @free, $clone; +        } else { +          push @newbusy, $clone; +        }        } -    } -    @clones = @newclones; -    sleep 30 if @clones; -  } +      if ( $failed_clone ) { +        my $errstr = $failed_clone->errstr; +        foreach my $clone (@newbusy, $failed_clone) { +          $clone->pg_cancel if $clone->{pg_async_status} == 1; +          $clone->disconnect; +        } +        die "$errstr\n"; +      } +      @busy = @newbusy; +      if (my $statement = $phase->[0]) { +        my $clone; +        if ( @free ) { +          $clone = shift(@free); +        } elsif ( !$MAX_HANDLES or  +                  scalar(@free) + scalar(@busy) < $MAX_HANDLES ) { +          $clone = $dbh->clone; # this will fail if over the server limit +        } + +        if ( $clone ) { +          my $rv = $clone->do($statement, {pg_async => PG_ASYNC}); +          if ( $rv ) { +            warn "$statement\n"; +            shift @{ $phase }; # and actually take the statement off the queue +            push @busy, $clone; +          } # else I don't know, wait and retry +        } # else too many handles, wait and retry +      } elsif (@busy) { +        # all statements are dispatched +        warn "Waiting for phase $i to complete\n"; +        sleep 30; +      } +    } # while @$phase or @busy +    $i++; +  } # foreach $phase +  warn "Schema changes complete.\n";  #  warn "Pre-schema change upgrades completed in ". (time-$start). " seconds\n"; # if $DEBUG;  #  $start = time;  #  dbdef->update_schema( dbdef_dist(datasrc), $dbh ); +} else { # normal case, run statements sequentially +  foreach my $statement ( @statements ) { +    warn "$statement\n"; +    $dbh->do( $statement ) +      or die "Error: ". $dbh->errstr. "\n executing: $statement"; +  }  }  warn "Schema upgrade completed in ". (time-$start). " seconds\n"; # if $DEBUG; | 
