execParallel.c 23.1 KB
Newer Older
R
Robert Haas 已提交
1 2 3 4 5
/*-------------------------------------------------------------------------
 *
 * execParallel.c
 *	  Support routines for parallel execution.
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
R
Robert Haas 已提交
7 8
 * Portions Copyright (c) 1994, Regents of the University of California
 *
9 10 11 12 13 14 15 16
 * This file contains routines that are intended to support setting up,
 * using, and tearing down a ParallelContext from within the PostgreSQL
 * executor.  The ParallelContext machinery will handle starting the
 * workers and ensuring that their state generally matches that of the
 * leader; see src/backend/access/transam/README.parallel for details.
 * However, we must save and restore relevant executor state, such as
 * any ParamListInfo associated with the query, buffer usage info, and
 * the actual plan to be passed down to the worker.
R
Robert Haas 已提交
17 18 19 20 21 22 23 24 25 26 27
 *
 * IDENTIFICATION
 *	  src/backend/executor/execParallel.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "executor/execParallel.h"
#include "executor/executor.h"
28 29
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
30
#include "executor/nodeSeqscan.h"
R
Robert Haas 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"

/*
 * Magic numbers for parallel executor communication.  We use constants
 * greater than any 32-bit integer here so that values < 2^32 can be used
 * by individual parallel nodes to store their own state.
 */
#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002)
#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005)

#define PARALLEL_TUPLE_QUEUE_SIZE		65536

/* DSM structure for accumulating per-PlanState instrumentation. */
struct SharedExecutorInstrumentation
{
	int instrument_options;
57 58 59 60 61
	int instrument_offset;		/* offset of first Instrumentation struct */
	int num_workers;							/* # of workers */
	int num_plan_nodes;							/* # of plan nodes */
	int plan_node_id[FLEXIBLE_ARRAY_MEMBER];	/* array of plan node IDs */
	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
R
Robert Haas 已提交
62
};
63 64 65
#define GetInstrumentationArray(sei) \
	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
R
Robert Haas 已提交
66 67 68 69 70 71 72 73

/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
{
	ParallelContext *pcxt;
	int nnodes;
} ExecParallelEstimateContext;

74
/* Context object for ExecParallelInitializeDSM. */
R
Robert Haas 已提交
75 76 77 78 79 80 81 82
typedef struct ExecParallelInitializeDSMContext
{
	ParallelContext *pcxt;
	SharedExecutorInstrumentation *instrumentation;
	int nnodes;
} ExecParallelInitializeDSMContext;

/* Helper functions that run in the parallel leader. */
R
Robert Haas 已提交
83
static char *ExecSerializePlan(Plan *plan, EState *estate);
R
Robert Haas 已提交
84 85 86 87
static bool ExecParallelEstimate(PlanState *node,
					 ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
					 ExecParallelInitializeDSMContext *d);
88 89
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
							 bool reinitialize);
R
Robert Haas 已提交
90 91 92 93 94 95 96 97 98 99 100
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
						  SharedExecutorInstrumentation *instrumentation);

/* Helper functions that run in the parallel worker. */
static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);

/*
 * Create a serialized representation of the plan to be sent to each worker.
 */
static char *
R
Robert Haas 已提交
101
ExecSerializePlan(Plan *plan, EState *estate)
R
Robert Haas 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
{
	PlannedStmt *pstmt;
	ListCell   *tlist;

	/* We can't scribble on the original plan, so make a copy. */
	plan = copyObject(plan);

	/*
	 * The worker will start its own copy of the executor, and that copy will
	 * insert a junk filter if the toplevel node has any resjunk entries. We
	 * don't want that to happen, because while resjunk columns shouldn't be
	 * sent back to the user, here the tuples are coming back to another
	 * backend which may very well need them.  So mutate the target list
	 * accordingly.  This is sort of a hack; there might be better ways to do
	 * this...
	 */
	foreach(tlist, plan->targetlist)
	{
		TargetEntry *tle = (TargetEntry *) lfirst(tlist);

		tle->resjunk = false;
	}

	/*
	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
	 * for our purposes, but the worker will need at least a minimal
	 * PlannedStmt to start the executor.
	 */
	pstmt = makeNode(PlannedStmt);
	pstmt->commandType = CMD_SELECT;
	pstmt->queryId = 0;
	pstmt->hasReturning = 0;
	pstmt->hasModifyingCTE = 0;
	pstmt->canSetTag = 1;
	pstmt->transientPlan = 0;
	pstmt->planTree = plan;
R
Robert Haas 已提交
138
	pstmt->rtable = estate->es_range_table;
R
Robert Haas 已提交
139 140 141 142 143
	pstmt->resultRelations = NIL;
	pstmt->utilityStmt = NULL;
	pstmt->subplans = NIL;
	pstmt->rewindPlanIDs = NULL;
	pstmt->rowMarks = NIL;
R
Robert Haas 已提交
144
	pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
R
Robert Haas 已提交
145 146 147
	pstmt->relationOids = NIL;
	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
	pstmt->hasRowSecurity = false;
148
	pstmt->hasForeignJoin = false;
R
Robert Haas 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

	/* Return serialized copy of our dummy PlannedStmt. */
	return nodeToString(pstmt);
}

/*
 * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
 * may need some state which is shared across all parallel workers.  Before
 * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
 * shm_toc_estimate_keys on &pcxt->estimator.
 *
 * While we're at it, count the number of PlanState nodes in the tree, so
 * we know how many SharedPlanStateInstrumentation structures we need.
 */
static bool
ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
{
	if (planstate == NULL)
		return false;

	/* Count this node. */
	e->nnodes++;

172
	/* Call estimators for parallel-aware nodes. */
173
	if (planstate->plan->parallel_aware)
174
	{
175 176 177 178 179 180
		switch (nodeTag(planstate))
		{
			case T_SeqScanState:
				ExecSeqScanEstimate((SeqScanState *) planstate,
									e->pcxt);
				break;
181 182 183 184 185 186 187 188
			case T_ForeignScanState:
				ExecForeignScanEstimate((ForeignScanState *) planstate,
										e->pcxt);
				break;
			case T_CustomScanState:
				ExecCustomScanEstimate((CustomScanState *) planstate,
									   e->pcxt);
				break;
189 190 191
			default:
				break;
		}
192
	}
R
Robert Haas 已提交
193 194 195 196 197

	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
}

/*
198 199
 * Initialize the dynamic shared memory segment that will be used to control
 * parallel execution.
R
Robert Haas 已提交
200 201 202 203 204 205 206 207
 */
static bool
ExecParallelInitializeDSM(PlanState *planstate,
						  ExecParallelInitializeDSMContext *d)
{
	if (planstate == NULL)
		return false;

208
	/* If instrumentation is enabled, initialize slot for this node. */
R
Robert Haas 已提交
209
	if (d->instrumentation != NULL)
210 211
		d->instrumentation->plan_node_id[d->nnodes] =
			planstate->plan->plan_node_id;
R
Robert Haas 已提交
212 213 214 215

	/* Count this node. */
	d->nnodes++;

216 217 218 219 220 221 222 223 224 225
	/*
	 * Call initializers for parallel-aware plan nodes.
	 *
	 * Ordinary plan nodes won't do anything here, but parallel-aware plan
	 * nodes may need to initialize shared state in the DSM before parallel
	 * workers are available.  They can allocate the space they previously
	 * estimated using shm_toc_allocate, and add the keys they previously
	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
	 */
	if (planstate->plan->parallel_aware)
226
	{
227 228 229 230 231 232
		switch (nodeTag(planstate))
		{
			case T_SeqScanState:
				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
										 d->pcxt);
				break;
233 234 235 236 237 238 239 240
			case T_ForeignScanState:
				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
											 d->pcxt);
				break;
			case T_CustomScanState:
				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
											d->pcxt);
				break;
241 242 243
			default:
				break;
		}
244
	}
R
Robert Haas 已提交
245 246 247 248 249 250 251 252 253

	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
}

/*
 * It sets up the response queues for backend workers to return tuples
 * to the main backend and start the workers.
 */
static shm_mq_handle **
254
ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
R
Robert Haas 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267
{
	shm_mq_handle **responseq;
	char	   *tqueuespace;
	int			i;

	/* Skip this if no workers. */
	if (pcxt->nworkers == 0)
		return NULL;

	/* Allocate memory for shared memory queue handles. */
	responseq = (shm_mq_handle **)
		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));

268 269 270 271 272 273 274 275 276 277
	/*
	 * If not reinitializing, allocate space from the DSM for the queues;
	 * otherwise, find the already allocated space.
	 */
	if (!reinitialize)
		tqueuespace =
			shm_toc_allocate(pcxt->toc,
							 PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
	else
		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
R
Robert Haas 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291

	/* Create the queues, and become the receiver for each. */
	for (i = 0; i < pcxt->nworkers; ++i)
	{
		shm_mq	   *mq;

		mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE,
						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);

		shm_mq_set_receiver(mq, MyProc);
		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
	}

	/* Add array of queues to shm_toc, so others can find it. */
292 293
	if (!reinitialize)
		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
R
Robert Haas 已提交
294 295 296 297 298

	/* Return array of handles. */
	return responseq;
}

299
/*
300 301
 * Re-initialize the parallel executor info such that it can be reused by
 * workers.
302
 */
303 304
void
ExecParallelReinitialize(ParallelExecutorInfo *pei)
305
{
306 307 308
	ReinitializeParallelDSM(pei->pcxt);
	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
	pei->finished = false;
309 310
}

R
Robert Haas 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
/*
 * Sets up the required infrastructure for backend workers to perform
 * execution and return results to the main backend.
 */
ParallelExecutorInfo *
ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
{
	ParallelExecutorInfo *pei;
	ParallelContext *pcxt;
	ExecParallelEstimateContext e;
	ExecParallelInitializeDSMContext d;
	char	   *pstmt_data;
	char	   *pstmt_space;
	char	   *param_space;
	BufferUsage *bufusage_space;
	SharedExecutorInstrumentation *instrumentation = NULL;
	int			pstmt_len;
	int			param_len;
	int			instrumentation_len = 0;
330
	int			instrument_offset = 0;
R
Robert Haas 已提交
331 332 333

	/* Allocate object for return value. */
	pei = palloc0(sizeof(ParallelExecutorInfo));
334
	pei->finished = false;
R
Robert Haas 已提交
335 336 337
	pei->planstate = planstate;

	/* Fix up and serialize plan to be sent to workers. */
R
Robert Haas 已提交
338
	pstmt_data = ExecSerializePlan(planstate->plan, estate);
R
Robert Haas 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387

	/* Create a parallel context. */
	pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
	pei->pcxt = pcxt;

	/*
	 * Before telling the parallel context to create a dynamic shared memory
	 * segment, we need to figure out how big it should be.  Estimate space
	 * for the various things we need to store.
	 */

	/* Estimate space for serialized PlannedStmt. */
	pstmt_len = strlen(pstmt_data) + 1;
	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
	shm_toc_estimate_keys(&pcxt->estimator, 1);

	/* Estimate space for serialized ParamListInfo. */
	param_len = EstimateParamListSpace(estate->es_param_list_info);
	shm_toc_estimate_chunk(&pcxt->estimator, param_len);
	shm_toc_estimate_keys(&pcxt->estimator, 1);

	/*
	 * Estimate space for BufferUsage.
	 *
	 * If EXPLAIN is not in use and there are no extensions loaded that care,
	 * we could skip this.  But we have no way of knowing whether anyone's
	 * looking at pgBufferUsage, so do it unconditionally.
	 */
	shm_toc_estimate_chunk(&pcxt->estimator,
						   sizeof(BufferUsage) * pcxt->nworkers);
	shm_toc_estimate_keys(&pcxt->estimator, 1);

	/* Estimate space for tuple queues. */
	shm_toc_estimate_chunk(&pcxt->estimator,
						   PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
	shm_toc_estimate_keys(&pcxt->estimator, 1);

	/*
	 * Give parallel-aware nodes a chance to add to the estimates, and get
	 * a count of how many PlanState nodes there are.
	 */
	e.pcxt = pcxt;
	e.nnodes = 0;
	ExecParallelEstimate(planstate, &e);

	/* Estimate space for instrumentation, if required. */
	if (estate->es_instrument)
	{
		instrumentation_len =
388 389 390 391 392
			offsetof(SharedExecutorInstrumentation, plan_node_id)
			+ sizeof(int) * e.nnodes;
		instrumentation_len = MAXALIGN(instrumentation_len);
		instrument_offset = instrumentation_len;
		instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
R
Robert Haas 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
		shm_toc_estimate_keys(&pcxt->estimator, 1);
	}

	/* Everyone's had a chance to ask for space, so now create the DSM. */
	InitializeParallelDSM(pcxt);

	/*
	 * OK, now we have a dynamic shared memory segment, and it should be big
	 * enough to store all of the data we estimated we would want to put into
	 * it, plus whatever general stuff (not specifically executor-related) the
	 * ParallelContext itself needs to store there.  None of the space we
	 * asked for has been allocated or initialized yet, though, so do that.
	 */

	/* Store serialized PlannedStmt. */
	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
	memcpy(pstmt_space, pstmt_data, pstmt_len);
	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);

	/* Store serialized ParamListInfo. */
	param_space = shm_toc_allocate(pcxt->toc, param_len);
	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
	SerializeParamList(estate->es_param_list_info, &param_space);

	/* Allocate space for each worker's BufferUsage; no need to initialize. */
	bufusage_space = shm_toc_allocate(pcxt->toc,
									  sizeof(BufferUsage) * pcxt->nworkers);
	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
	pei->buffer_usage = bufusage_space;

	/* Set up tuple queues. */
425
	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
R
Robert Haas 已提交
426 427 428 429 430 431 432 433

	/*
	 * If instrumentation options were supplied, allocate space for the
	 * data.  It only gets partially initialized here; the rest happens
	 * during ExecParallelInitializeDSM.
	 */
	if (estate->es_instrument)
	{
434 435 436
		Instrumentation *instrument;
		int		i;

R
Robert Haas 已提交
437 438
		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
		instrumentation->instrument_options = estate->es_instrument;
439 440 441 442 443 444
		instrumentation->instrument_offset = instrument_offset;
		instrumentation->num_workers = nworkers;
		instrumentation->num_plan_nodes = e.nnodes;
		instrument = GetInstrumentationArray(instrumentation);
		for (i = 0; i < nworkers * e.nnodes; ++i)
			InstrInit(&instrument[i], estate->es_instrument);
R
Robert Haas 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
					   instrumentation);
		pei->instrumentation = instrumentation;
	}

	/*
	 * Give parallel-aware nodes a chance to initialize their shared data.
	 * This also initializes the elements of instrumentation->ps_instrument,
	 * if it exists.
	 */
	d.pcxt = pcxt;
	d.instrumentation = instrumentation;
	d.nnodes = 0;
	ExecParallelInitializeDSM(planstate, &d);

	/*
	 * Make sure that the world hasn't shifted under our feat.  This could
	 * probably just be an Assert(), but let's be conservative for now.
	 */
	if (e.nnodes != d.nnodes)
		elog(ERROR, "inconsistent count of PlanState nodes");

	/* OK, we're ready to rock and roll. */
	return pei;
}

/*
 * Copy instrumentation information about this node and its descendents from
 * dynamic shared memory.
 */
static bool
ExecParallelRetrieveInstrumentation(PlanState *planstate,
						  SharedExecutorInstrumentation *instrumentation)
{
479
	Instrumentation *instrument;
R
Robert Haas 已提交
480
	int		i;
481 482
	int		n;
	int		ibytes;
R
Robert Haas 已提交
483 484 485
	int		plan_node_id = planstate->plan->plan_node_id;

	/* Find the instumentation for this node. */
486 487
	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
		if (instrumentation->plan_node_id[i] == plan_node_id)
R
Robert Haas 已提交
488
			break;
489
	if (i >= instrumentation->num_plan_nodes)
R
Robert Haas 已提交
490 491
		elog(ERROR, "plan node %d not found", plan_node_id);

492 493 494 495 496 497 498 499 500 501 502 503
	/* Accumulate the statistics from all workers. */
	instrument = GetInstrumentationArray(instrumentation);
	instrument += i * instrumentation->num_workers;
	for (n = 0; n < instrumentation->num_workers; ++n)
		InstrAggNode(planstate->instrument, &instrument[n]);

	/* Also store the per-worker detail. */
	ibytes = instrumentation->num_workers * sizeof(Instrumentation);
	planstate->worker_instrument =
		palloc(offsetof(WorkerInstrumentation, instrument) + ibytes);
	planstate->worker_instrument->num_workers = instrumentation->num_workers;
	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
R
Robert Haas 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517

	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
								 instrumentation);
}

/*
 * Finish parallel execution.  We wait for parallel workers to finish, and
 * accumulate their buffer usage and instrumentation.
 */
void
ExecParallelFinish(ParallelExecutorInfo *pei)
{
	int		i;

518 519 520
	if (pei->finished)
		return;

R
Robert Haas 已提交
521 522 523 524
	/* First, wait for the workers to finish. */
	WaitForParallelWorkersToFinish(pei->pcxt);

	/* Next, accumulate buffer usage. */
525
	for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
R
Robert Haas 已提交
526 527 528 529 530 531
		InstrAccumParallelQuery(&pei->buffer_usage[i]);

	/* Finally, accumulate instrumentation, if any. */
	if (pei->instrumentation)
		ExecParallelRetrieveInstrumentation(pei->planstate,
											pei->instrumentation);
532 533

	pei->finished = true;
R
Robert Haas 已提交
534 535
}

536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
/*
 * Clean up whatever ParallelExecutreInfo resources still exist after
 * ExecParallelFinish.  We separate these routines because someone might
 * want to examine the contents of the DSM after ExecParallelFinish and
 * before calling this routine.
 */
void
ExecParallelCleanup(ParallelExecutorInfo *pei)
{
	if (pei->pcxt != NULL)
	{
		DestroyParallelContext(pei->pcxt);
		pei->pcxt = NULL;
	}
	pfree(pei);
}

R
Robert Haas 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
/*
 * Create a DestReceiver to write tuples we produce to the shm_mq designated
 * for that purpose.
 */
static DestReceiver *
ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
{
	char	   *mqspace;
	shm_mq	   *mq;

	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
	mq = (shm_mq *) mqspace;
	shm_mq_set_sender(mq, MyProc);
	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
}

/*
 * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
 */
static QueryDesc *
ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
						 int instrument_options)
{
	char	   *pstmtspace;
	char	   *paramspace;
	PlannedStmt *pstmt;
	ParamListInfo paramLI;

	/* Reconstruct leader-supplied PlannedStmt. */
	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
	pstmt = (PlannedStmt *) stringToNode(pstmtspace);

	/* Reconstruct ParamListInfo. */
	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
	paramLI = RestoreParamList(&paramspace);

	/*
	 * Create a QueryDesc for the query.
	 *
	 * It's not obvious how to obtain the query string from here; and even if
	 * we could copying it would take more cycles than not copying it. But
	 * it's a bit unsatisfying to just use a dummy string here, so consider
	 * revising this someday.
	 */
	return CreateQueryDesc(pstmt,
						   "<parallel query>",
						   GetActiveSnapshot(), InvalidSnapshot,
						   receiver, paramLI, instrument_options);
}

/*
 * Copy instrumentation information from this node and its descendents into
 * dynamic shared memory, so that the parallel leader can retrieve it.
 */
static bool
ExecParallelReportInstrumentation(PlanState *planstate,
						  SharedExecutorInstrumentation *instrumentation)
{
	int		i;
	int		plan_node_id = planstate->plan->plan_node_id;
614 615 616
	Instrumentation *instrument;

	InstrEndLoop(planstate->instrument);
R
Robert Haas 已提交
617 618 619 620 621 622 623

	/*
	 * If we shuffled the plan_node_id values in ps_instrument into sorted
	 * order, we could use binary search here.  This might matter someday
	 * if we're pushing down sufficiently large plan trees.  For now, do it
	 * the slow, dumb way.
	 */
624 625
	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
		if (instrumentation->plan_node_id[i] == plan_node_id)
R
Robert Haas 已提交
626
			break;
627
	if (i >= instrumentation->num_plan_nodes)
R
Robert Haas 已提交
628 629 630
		elog(ERROR, "plan node %d not found", plan_node_id);

	/*
631 632
	 * Add our statistics to the per-node, per-worker totals.  It's possible
	 * that this could happen more than once if we relaunched workers.
R
Robert Haas 已提交
633
	 */
634 635 636 637 638
	instrument = GetInstrumentationArray(instrumentation);
	instrument += i * instrumentation->num_workers;
	Assert(IsParallelWorker());
	Assert(ParallelWorkerNumber < instrumentation->num_workers);
	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
R
Robert Haas 已提交
639 640 641 642 643

	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
								 instrumentation);
}

644 645 646 647 648 649 650 651 652 653 654 655
/*
 * Initialize the PlanState and its descendents with the information
 * retrieved from shared memory.  This has to be done once the PlanState
 * is allocated and initialized by executor; that is, after ExecutorStart().
 */
static bool
ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
{
	if (planstate == NULL)
		return false;

	/* Call initializers for parallel-aware plan nodes. */
656
	if (planstate->plan->parallel_aware)
657
	{
658 659 660 661 662
		switch (nodeTag(planstate))
		{
			case T_SeqScanState:
				ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
				break;
663 664 665 666 667 668 669 670
			case T_ForeignScanState:
				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
												toc);
				break;
			case T_CustomScanState:
				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
											   toc);
				break;
671 672 673
			default:
				break;
		}
674 675 676 677 678
	}

	return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
}

R
Robert Haas 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
/*
 * Main entrypoint for parallel query worker processes.
 *
 * We reach this function from ParallelMain, so the setup necessary to create
 * a sensible parallel environment has already been done; ParallelMain worries
 * about stuff like the transaction state, combo CID mappings, and GUC values,
 * so we don't need to deal with any of that here.
 *
 * Our job is to deal with concerns specific to the executor.  The parallel
 * group leader will have stored a serialized PlannedStmt, and it's our job
 * to execute that plan and write the resulting tuples to the appropriate
 * tuple queue.  Various bits of supporting information that we need in order
 * to do this are also stored in the dsm_segment and can be accessed through
 * the shm_toc.
 */
static void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
	BufferUsage *buffer_usage;
	DestReceiver *receiver;
	QueryDesc  *queryDesc;
	SharedExecutorInstrumentation *instrumentation;
	int			instrument_options = 0;

	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
	receiver = ExecParallelGetReceiver(seg, toc);
	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
	if (instrumentation != NULL)
		instrument_options = instrumentation->instrument_options;
	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);

	/* Prepare to track buffer usage during query execution. */
	InstrStartParallelQuery();

	/* Start up the executor, have it run the plan, and then shut it down. */
	ExecutorStart(queryDesc, 0);
715
	ExecParallelInitializeWorker(queryDesc->planstate, toc);
R
Robert Haas 已提交
716 717 718 719 720 721 722 723 724 725 726 727
	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
	ExecutorFinish(queryDesc);

	/* Report buffer usage during parallel execution. */
	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);

	/* Report instrumentation data if any instrumentation options are set. */
	if (instrumentation != NULL)
		ExecParallelReportInstrumentation(queryDesc->planstate,
										  instrumentation);

R
Robert Haas 已提交
728 729 730
	/* Must do this after capturing instrumentation. */
	ExecutorEnd(queryDesc);

R
Robert Haas 已提交
731 732 733 734
	/* Cleanup. */
	FreeQueryDesc(queryDesc);
	(*receiver->rDestroy) (receiver);
}