diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 4f91cd0265db041c671ab39ace93dd70b7a170ea..0bba9a7dbdaef765292fc528062315017586b859 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -520,7 +520,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) */ CHECK_FOR_INTERRUPTS(); - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { if (pcxt->worker[i].error_mqh != NULL) { @@ -560,7 +560,7 @@ WaitForParallelWorkersToExit(ParallelContext *pcxt) int i; /* Wait until the workers actually die. */ - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { BgwHandleStatus status; @@ -610,7 +610,7 @@ DestroyParallelContext(ParallelContext *pcxt) /* Kill each worker in turn, and forget their error queues. */ if (pcxt->worker != NULL) { - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { if (pcxt->worker[i].error_mqh != NULL) { @@ -708,7 +708,7 @@ HandleParallelMessages(void) if (pcxt->worker == NULL) continue; - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { /* * Read as many messages as we can from each worker, but stop when diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 95e8e41d2bbc86f8a744f2722fbf0921da322761..93c786abdbe4dc265abe1bc867480f555e086b59 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -522,7 +522,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) WaitForParallelWorkersToFinish(pei->pcxt); /* Next, accumulate buffer usage. */ - for (i = 0; i < pei->pcxt->nworkers; ++i) + for (i = 0; i < pei->pcxt->nworkers_launched; ++i) InstrAccumParallelQuery(&pei->buffer_usage[i]); /* Finally, accumulate instrumentation, if any. */ diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 16c981b48b277a26b84040c59d736d04df54c205..3f0ed6963277bf5e8ef438580e2b4835c95c597d 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -153,7 +153,6 @@ ExecGather(GatherState *node) if (gather->num_workers > 0 && IsInParallelMode()) { ParallelContext *pcxt; - bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ if (!node->pei) @@ -169,29 +168,26 @@ ExecGather(GatherState *node) LaunchParallelWorkers(pcxt); /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers > 0) + if (pcxt->nworkers_launched > 0) { node->nreaders = 0; node->reader = - palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { - if (pcxt->worker[i].bgwhandle == NULL) - continue; - shm_mq_set_handle(node->pei->tqueue[i], pcxt->worker[i].bgwhandle); node->reader[node->nreaders++] = CreateTupleQueueReader(node->pei->tqueue[i], fslot->tts_tupleDescriptor); - got_any_worker = true; } } - - /* No workers? Then never mind. */ - if (!got_any_worker) + else + { + /* No workers? Then never mind. */ ExecShutdownGatherWorkers(node); + } } /* Run plan locally if no workers or not single-copy. */