From df4685fb0cad1c75970b6e8d0aacca4d03545e04 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 4 Mar 2016 12:59:10 -0500 Subject: [PATCH] Minor optimizations based on ParallelContext having nworkers_launched. Originally, we didn't have nworkers_launched, so code that used parallel contexts had to be preprared for the possibility that not all of the workers requested actually got launched. But now we can count on knowing the number of workers that were successfully launched, which can shave off a few cycles and simplify some code slightly. Amit Kapila, reviewed by Haribabu Kommi, per a suggestion from Peter Geoghegan. --- src/backend/access/transam/parallel.c | 8 ++++---- src/backend/executor/execParallel.c | 2 +- src/backend/executor/nodeGather.c | 18 +++++++----------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 4f91cd0265..0bba9a7dbd 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -520,7 +520,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) */ CHECK_FOR_INTERRUPTS(); - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { if (pcxt->worker[i].error_mqh != NULL) { @@ -560,7 +560,7 @@ WaitForParallelWorkersToExit(ParallelContext *pcxt) int i; /* Wait until the workers actually die. */ - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { BgwHandleStatus status; @@ -610,7 +610,7 @@ DestroyParallelContext(ParallelContext *pcxt) /* Kill each worker in turn, and forget their error queues. */ if (pcxt->worker != NULL) { - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { if (pcxt->worker[i].error_mqh != NULL) { @@ -708,7 +708,7 @@ HandleParallelMessages(void) if (pcxt->worker == NULL) continue; - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { /* * Read as many messages as we can from each worker, but stop when diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 95e8e41d2b..93c786abdb 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -522,7 +522,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) WaitForParallelWorkersToFinish(pei->pcxt); /* Next, accumulate buffer usage. */ - for (i = 0; i < pei->pcxt->nworkers; ++i) + for (i = 0; i < pei->pcxt->nworkers_launched; ++i) InstrAccumParallelQuery(&pei->buffer_usage[i]); /* Finally, accumulate instrumentation, if any. */ diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 16c981b48b..3f0ed69632 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -153,7 +153,6 @@ ExecGather(GatherState *node) if (gather->num_workers > 0 && IsInParallelMode()) { ParallelContext *pcxt; - bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ if (!node->pei) @@ -169,29 +168,26 @@ ExecGather(GatherState *node) LaunchParallelWorkers(pcxt); /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers > 0) + if (pcxt->nworkers_launched > 0) { node->nreaders = 0; node->reader = - palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); - for (i = 0; i < pcxt->nworkers; ++i) + for (i = 0; i < pcxt->nworkers_launched; ++i) { - if (pcxt->worker[i].bgwhandle == NULL) - continue; - shm_mq_set_handle(node->pei->tqueue[i], pcxt->worker[i].bgwhandle); node->reader[node->nreaders++] = CreateTupleQueueReader(node->pei->tqueue[i], fslot->tts_tupleDescriptor); - got_any_worker = true; } } - - /* No workers? Then never mind. */ - if (!got_any_worker) + else + { + /* No workers? Then never mind. */ ExecShutdownGatherWorkers(node); + } } /* Run plan locally if no workers or not single-copy. */ -- GitLab