+} elsif ( $opt_a ) {
+
+ ###
+ # -a: Run schema changes in parallel (Pg only).
+ ###
+
+ my $MAX_HANDLES; # undef for now, set it if you want a limit
+
+ my @phases = map { [] } 0..4;
+ my $fsupgrade_idx = 1;
+ my %idx_map;
+ foreach (@statements) {
+ if ( /^ *(CREATE|ALTER) +TABLE/ ) {
+ # phase 0: CREATE TABLE, ALTER TABLE
+ push @{ $phases[0] }, $_;
+ } elsif ( /^ *ALTER +INDEX.* RENAME TO dbs_temp(\d+)/ ) {
+ # phase 1: rename index to dbs_temp%d
+ # (see DBIx::DBSchema::Table)
+ # but in this case, uniqueify all the dbs_temps. This method only works
+ # because they are in the right order to begin with...
+ my $dbstemp_idx = $1;
+ s/dbs_temp$dbstemp_idx/fsupgrade_temp$fsupgrade_idx/;
+ $idx_map{ $dbstemp_idx } = $fsupgrade_idx;
+ push @{ $phases[1] }, $_;
+ $fsupgrade_idx++;
+ } elsif ( /^ *(CREATE|DROP)( +UNIQUE)? +INDEX/ ) {
+ # phase 2: create/drop indices
+ push @{ $phases[2] }, $_;
+ } elsif ( /^ *ALTER +INDEX +dbs_temp(\d+) +RENAME/ ) {
+ # phase 3: rename temp indices back to real ones
+ my $dbstemp_idx = $1;
+ my $mapped_idx = $idx_map{ $dbstemp_idx }
+ or die "unable to remap dbs_temp$1 RENAME statement";
+ s/dbs_temp$dbstemp_idx/fsupgrade_temp$mapped_idx/;
+ push @{ $phases[3] }, $_;
+ } else {
+ # phase 4: everything else (CREATE SEQUENCE, SELECT SETVAL, etc.)
+ push @{ $phases[4] }, $_;
+ }
+ }
+ my $i = 0;
+ my @busy = ();
+ my @free = ();
+ foreach my $phase (@phases) {
+ warn "Starting schema changes, phase $i...\n";
+ while (@$phase or @busy) {
+ # check status of all running tasks
+ my @newbusy;
+ my $failed_clone;
+ for my $clone (@busy) {
+ if ( $clone->pg_ready ) {
+ # then clean it up
+ my $rv = $clone->pg_result && $clone->commit;
+ $failed_clone = $clone if !$rv;
+ push @free, $clone;
+ } else {
+ push @newbusy, $clone;
+ }
+ }
+ if ( $failed_clone ) {
+ my $errstr = $failed_clone->errstr;
+ foreach my $clone (@newbusy, $failed_clone) {
+ $clone->pg_cancel if $clone->{pg_async_status} == 1;
+ $clone->disconnect;
+ }
+ die "$errstr\n";
+ }
+ @busy = @newbusy;
+ if (my $statement = $phase->[0]) {
+ my $clone;
+ if ( @free ) {
+ $clone = shift(@free);
+ } elsif ( !$MAX_HANDLES or
+ scalar(@free) + scalar(@busy) < $MAX_HANDLES ) {
+ $clone = $dbh->clone; # this will fail if over the server limit
+ }
+
+ if ( $clone ) {
+ my $rv = $clone->do($statement, {pg_async => PG_ASYNC});
+ if ( $rv ) {
+ warn "$statement\n";
+ shift @{ $phase }; # and actually take the statement off the queue
+ push @busy, $clone;
+ } # else I don't know, wait and retry
+ } # else too many handles, wait and retry
+ } elsif (@busy) {
+ # all statements are dispatched
+ warn "Waiting for phase $i to complete\n";
+ sleep 30;
+ }
+ } # while @$phase or @busy
+ $i++;
+ } # foreach $phase
+ warn "Schema changes complete.\n";
+
+# warn "Pre-schema change upgrades completed in ". (time-$start). " seconds\n"; # if $DEBUG;
+# $start = time;
+
+# dbdef->update_schema( dbdef_dist(datasrc), $dbh );
+