cdbdisp_async.c 27.3 KB
Newer Older
1 2 3 4 5 6
/*-------------------------------------------------------------------------
 *
 * cdbdisp_async.c
 *	  Functions for asynchronous implementation of dispatching
 *	  commands to QExecutors.
 *
7 8 9 10 11 12
 * GPDB_12_MERGE_FIXME: We should switch to using WaitEventSetWait() instead
 * of straight poll() in this file. WaitEventSetWait() would report the status
 * using the new wait event infrastructure, so that it would show up as a
 * separate state in pg_stat_activity. It's also potentially more efficient.
 *
 *
13 14 15 16 17 18
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbdisp_async.c
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif

#include "storage/ipc.h"		/* For proc_exit_inprogress  */
#include "tcop/tcopprot.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_async.h"
#include "cdb/cdbdispatchresult.h"
37 38
#include "libpq-fe.h"
#include "libpq-int.h"
39 40 41
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
A
Adam Lee 已提交
42
#include "cdb/cdbpq.h"
43
#include "miscadmin.h"
D
David Kimura 已提交
44 45
#include "commands/sequence.h"
#include "access/xact.h"
46
#include "utils/timestamp.h"
47 48 49 50 51 52 53 54
#define DISPATCH_WAIT_TIMEOUT_MSEC 2000

/*
 * Ideally, we should set timeout to zero to cancel QEs as soon as possible,
 * but considering the cost of sending cancel signal is high, we want to process
 * as many finishing QEs as possible before cancelling
 */
#define DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC 100
55 56 57 58 59 60 61 62 63 64 65 66

typedef struct CdbDispatchCmdAsync
{

	/*
	 * dispatchResultPtrArray: Array[0..dispatchCount-1] of CdbDispatchResult*
	 * Each CdbDispatchResult object points to a SegmentDatabaseDescriptor
	 * that dispatcher will send the command to.
	 */
	struct CdbDispatchResult **dispatchResultPtrArray;

	/* Number of segment DBs dispatched */
67
	int			dispatchCount;
68 69 70

	/*
	 * Depending on this mode, we may send query cancel or query finish
71 72
	 * message to QE while we are waiting it to complete.  NONE means we
	 * expect QE to complete without any instruction.
73 74 75 76
	 */
	volatile DispatchWaitMode waitMode;

	/*
77 78
	 * Text information to dispatch: The format is type(1 byte) + length(size
	 * of int) + content(n bytes)
79
	 *
80 81 82
	 * For DTX command, type is 'T', it's built by function
	 * buildGpDtxProtocolCommand. For query, type is 'M', it's built by
	 * function buildGpQueryString.
83
	 */
84 85
	char	   *query_text;
	int			query_text_len;
86

87
} CdbDispatchCmdAsync;
88

89
static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len);
90

91 92
static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
								  DispatchWaitMode waitMode);
93

94 95
static void cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
96
							 int sliceIndex);
97
static void	cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds);
98

99 100
static bool	cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds);
static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds);
101 102 103 104

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
	cdbdisp_checkForCancel_async,
105
	cdbdisp_getWaitSocketFd_async,
106 107
	cdbdisp_makeDispatchParams_async,
	cdbdisp_checkDispatchResult_async,
108 109
	cdbdisp_dispatchToGang_async,
	cdbdisp_waitDispatchFinish_async
110 111 112
};


113
static void dispatchCommand(CdbDispatchResult *dispatchResult,
114 115 116
				const char *query_text,
				int query_text_len);

117 118
static void checkDispatchResult(CdbDispatcherState *ds,
					bool wait);
119

120
static bool processResults(CdbDispatchResult *dispatchResult);
121 122

static void
123
			signalQEs(CdbDispatchCmdAsync *pParms);
124 125

static void
126
			checkSegmentAlive(CdbDispatchCmdAsync *pParms);
127

128
static void
129
			handlePollError(CdbDispatchCmdAsync *pParms);
130

131
static void
132
			handlePollSuccess(CdbDispatchCmdAsync *pParms, struct pollfd *fds);
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148

/*
 * Check dispatch result.
 * Don't wait all dispatch commands to complete.
 *
 * Return true if any connection received error.
 */
static bool
cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds)
{
	Assert(ds);

	checkDispatchResult(ds, false);
	return cdbdisp_checkResultsErrcode(ds->primaryResults);
}

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
/*
 * Return a FD to wait for, after dispatching.
 */
static int
cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds)
{
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			i;

	Assert(ds);

	if (proc_exit_inprogress)
		return PGINVALID_SOCKET;

	/*
	 * This should match the logic in cdbdisp_checkForCancel_async(). In
	 * particular, when cdbdisp_checkForCancel_async() is called, it must
	 * process any incoming data from the socket we return here, or we
	 * will busy wait.
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult;
		SegmentDatabaseDescriptor *segdbDesc;

		dispatchResult = pParms->dispatchResultPtrArray[i];
		segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Already finished with this QE?
		 */
		if (!dispatchResult->stillRunning)
			continue;

		Assert(!cdbconn_isBadConnection(segdbDesc));

		return PQsocket(segdbDesc->conn);
	}

	return PGINVALID_SOCKET;
}

191 192 193 194 195 196 197 198
/*
 * Block until all data are dispatched.
 */
static void
cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds)
{
	const static int DISPATCH_POLL_TIMEOUT = 500;
	struct pollfd *fds;
199 200 201 202
	int			nfds,
				i;
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			dispatchCount = pParms->dispatchCount;
203 204 205

	fds = (struct pollfd *) palloc(dispatchCount * sizeof(struct pollfd));

206
	while (true)
207
	{
208
		int			pollRet;
209 210 211 212 213 214 215 216

		nfds = 0;
		memset(fds, 0, dispatchCount * sizeof(struct pollfd));

		for (i = 0; i < dispatchCount; i++)
		{
			CdbDispatchResult *qeResult = pParms->dispatchResultPtrArray[i];
			SegmentDatabaseDescriptor *segdbDesc = qeResult->segdbDesc;
217 218
			PGconn	   *conn = segdbDesc->conn;
			int			ret;
219 220 221 222 223

			/* skip already completed connections */
			if (conn->outCount == 0)
				continue;

224 225 226 227
			/*
			 * call send for this connection regardless of its POLLOUT status,
			 * because it may be writable NOW
			 */
228 229 230 231 232 233
			ret = pqFlushNonBlocking(conn);

			if (ret == 0)
				continue;
			else if (ret > 0)
			{
234 235
				int			sock = PQsocket(segdbDesc->conn);

236 237 238 239 240 241 242
				Assert(sock >= 0);
				fds[nfds].fd = sock;
				fds[nfds].events = POLLOUT;
				nfds++;
			}
			else if (ret < 0)
			{
243
				/* error message should be set up already */
244
				char	   *msg = PQerrorMessage(conn);
245 246 247 248

				qeResult->stillRunning = false;
				ereport(ERROR,
						(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
249
						 errmsg("Command could not be dispatch to segment %s: %s", qeResult->segdbDesc->whoami, msg ? msg : "unknown error")));
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
			}
		}

		if (nfds == 0)
			break;

		/* guarantee poll() is interruptible */
		do
		{
			CHECK_FOR_INTERRUPTS();

			pollRet = poll(fds, nfds, DISPATCH_POLL_TIMEOUT);
			if (pollRet == 0)
				ELOG_DISPATCHER_DEBUG("cdbdisp_waitDispatchFinish_async(): Dispatch poll timeout after %d ms", DISPATCH_POLL_TIMEOUT);
		}
		while (pollRet == 0 || (pollRet < 0 && (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)));

		if (pollRet < 0)
			elog(ERROR, "Poll failed during dispatch");
	}

	pfree(fds);
}

274 275 276
/*
 * Dispatch command to gang.
 *
277 278
 * Throw out error to upper try-catch block if anything goes wrong. This function only kicks off dispatching,
 * call cdbdisp_waitDispatchFinish_async to ensure the completion
279 280 281 282
 */
static void
cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
283
							 int sliceIndex)
284
{
285
	int			i;
286

287
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
288 289 290 291 292 293

	/*
	 * Start the dispatching
	 */
	for (i = 0; i < gp->size; i++)
	{
294
		CdbDispatchResult *qeResult;
295

296
		SegmentDatabaseDescriptor *segdbDesc = gp->db_descriptors[i];
297

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
		Assert(segdbDesc != NULL);

		/*
		 * Initialize the QE's CdbDispatchResult object.
		 */
		qeResult = cdbdisp_makeResult(ds->primaryResults, segdbDesc, sliceIndex);
		if (qeResult == NULL)
		{
			elog(FATAL, "could not allocate resources for segworker communication");
		}
		pParms->dispatchResultPtrArray[pParms->dispatchCount++] = qeResult;

		dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
	}
}

/*
 * Check dispatch result.
 *
 * Wait all dispatch work to complete, either success or fail.
 * (Set stillRunning to true when one dispatch work is completed)
 */
static void
cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
322
								  DispatchWaitMode waitMode)
323 324
{
	Assert(ds != NULL);
325
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
326 327

	/* cdbdisp_destroyDispatcherState is called */
328
	if (pParms == NULL)
329 330
		return;

331 332 333 334
	/*
	 * Don't overwrite DISPATCH_WAIT_CANCEL or DISPATCH_WAIT_FINISH with
	 * DISPATCH_WAIT_NONE
	 */
335 336 337 338 339 340
	if (waitMode != DISPATCH_WAIT_NONE)
		pParms->waitMode = waitMode;

	checkDispatchResult(ds, true);

	/*
341 342
	 * It looks like everything went fine, make sure we don't miss a user
	 * cancellation?
343 344 345 346 347 348 349 350 351 352 353 354 355 356
	 *
	 * The waitMode argument is NONE when we are doing "normal work".
	 */
	if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH)
		CHECK_FOR_INTERRUPTS();
}

/*
 * Allocates memory for a CdbDispatchCmdAsync structure and do the initialization.
 *
 * Memory will be freed in function cdbdisp_destroyDispatcherState by deleting the
 * memory context.
 */
static void *
357
cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len)
358
{
359
	int			maxResults = maxSlices * largestGangSize;
360
	int			size = 0;
361 362 363 364 365 366 367 368 369 370

	CdbDispatchCmdAsync *pParms = palloc0(sizeof(CdbDispatchCmdAsync));

	size = maxResults * sizeof(CdbDispatchResult *);
	pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size);
	pParms->dispatchCount = 0;
	pParms->waitMode = DISPATCH_WAIT_NONE;
	pParms->query_text = queryText;
	pParms->query_text_len = len;

371
	return (void *) pParms;
372 373 374 375 376 377 378 379 380 381 382 383 384
}

/*
 * Receive and process results from all running QEs.
 *
 * wait: true, wait until all dispatch works are completed.
 *       false, return immediate when there's no more data.
 *
 * Don't throw out error, instead, append the error message to
 * CdbDispatchResult.error_message.
 */
static void
checkDispatchResult(CdbDispatcherState *ds,
385
					bool wait)
386
{
387
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
388
	CdbDispatchResults *meleeResults = ds->primaryResults;
389 390
	SegmentDatabaseDescriptor *segdbDesc;
	CdbDispatchResult *dispatchResult;
391 392 393 394
	int			i;
	int			db_count = 0;
	int			timeout = 0;
	bool		sentSignal = false;
395
	struct pollfd *fds;
396
	uint8 ftsVersion = 0;
397 398 399 400 401

	db_count = pParms->dispatchCount;
	fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd));

	/*
402 403
	 * OK, we are finished submitting the command to the segdbs. Now, we have
	 * to wait for them to finish.
404 405 406
	 */
	for (;;)
	{
407 408 409
		int			sock;
		int			n;
		int			nfds = 0;
410
		PGconn		*conn;
411 412

		/*
413 414
		 * bail-out if we are dying. Once QD dies, QE will recognize it
		 * shortly anyway.
415 416 417 418
		 */
		if (proc_exit_inprogress)
			break;

419
		/*
420 421 422
		 * escalate waitMode to cancel if: - user interrupt has occurred, - or
		 * an error has been reported by any QE, - in case the caller wants
		 * cancelOnError
423 424 425 426
		 */
		if ((InterruptPending || meleeResults->errcode) && meleeResults->cancelOnError)
			pParms->waitMode = DISPATCH_WAIT_CANCEL;

427 428 429 430 431 432 433
		/*
		 * Which QEs are still running and could send results to us?
		 */
		for (i = 0; i < db_count; i++)
		{
			dispatchResult = pParms->dispatchResultPtrArray[i];
			segdbDesc = dispatchResult->segdbDesc;
434
			conn = segdbDesc->conn;
435

P
Pengzhou Tang 已提交
436 437 438 439 440 441
			/*
			 * Already finished with this QE?
			 */
			if (!dispatchResult->stillRunning)
				continue;

442
			Assert(!cdbconn_isBadConnection(segdbDesc));
443

444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
			/*
			 * Flush out buffer in case some commands are not fully
			 * dispatched to QEs, this can prevent QD from polling
			 * on such QEs forever.
			 */
			if (conn->outCount > 0)
			{
				/*
				 * Don't error out here, let following poll() routine to
				 * handle it.
				 */
				if (pqFlush(conn) < 0)
					elog(LOG, "Failed flushing outbound data to %s:%s",
						 segdbDesc->whoami, PQerrorMessage(conn));
			}

460 461 462
			/*
			 * Add socket to fd_set if still connected.
			 */
463
			sock = PQsocket(conn);
464 465 466 467 468 469 470 471 472 473 474 475 476
			Assert(sock >= 0);
			fds[nfds].fd = sock;
			fds[nfds].events = POLLIN;
			nfds++;
		}

		/*
		 * Break out when no QEs still running.
		 */
		if (nfds <= 0)
			break;

		/*
477 478
		 * Wait for results from QEs
		 *
479 480 481 482
		 * Don't wait if: - this is called from interconnect to check if
		 * there's any error.
		 *
		 * Lower the timeout if: - we need send signal to QEs.
483
		 */
484 485 486 487 488 489 490 491
		if (!wait)
			timeout = 0;
		else if (pParms->waitMode == DISPATCH_WAIT_NONE || sentSignal)
			timeout = DISPATCH_WAIT_TIMEOUT_MSEC;
		else
			timeout = DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC;

		n = poll(fds, nfds, timeout);
492

493 494 495 496
		/*
		 * poll returns with an error, including one due to an interrupted
		 * call
		 */
497 498
		if (n < 0)
		{
499 500
			int			sock_errno = SOCK_ERRNO;

501 502 503 504
			if (sock_errno == EINTR)
				continue;

			elog(LOG, "handlePollError poll() failed; errno=%d", sock_errno);
505

506
			handlePollError(pParms);
507 508 509 510 511 512 513

			/*
			 * Since an error was detected for the segment, request
			 * FTS to perform a probe before checking the segment
			 * state.
			 */
			FtsNotifyProber();
514 515
			checkSegmentAlive(pParms);

516 517 518 519 520
			if (pParms->waitMode != DISPATCH_WAIT_NONE)
			{
				signalQEs(pParms);
				sentSignal = true;
			}
521 522 523

			if (!wait)
				break;
524 525 526 527
		}
		/* If the time limit expires, poll() returns 0 */
		else if (n == 0)
		{
528
			if (pParms->waitMode != DISPATCH_WAIT_NONE)
529
			{
530 531
				signalQEs(pParms);
				sentSignal = true;
532
			}
533

534 535 536 537 538 539 540 541
			/*
			 * This code relies on FTS being triggered at regular
			 * intervals. Iff FTS detects change in configuration
			 * then check segment state. FTS probe is not triggered
			 * explicitly in this case because this happens every
			 * DISPATCH_WAIT_TIMEOUT_MSEC.
			 */
			if (ftsVersion == 0 || ftsVersion != getFtsVersion())
542
			{
543
				ftsVersion = getFtsVersion();
544
				checkSegmentAlive(pParms);
545
			}
546 547 548

			if (!wait)
				break;
549 550 551 552 553 554 555 556 557 558 559 560 561
		}
		/* We have data waiting on one or more of the connections. */
		else
			handlePollSuccess(pParms, fds);
	}

	pfree(fds);
}

/*
 * Helper function that actually kicks off the command on the libpq connection.
 */
static void
562
dispatchCommand(CdbDispatchResult *dispatchResult,
563 564 565 566
				const char *query_text,
				int query_text_len)
{
	TimestampTz beforeSend = 0;
567 568
	long		secs;
	int			usecs;
569 570 571 572 573 574 575

	if (DEBUG1 >= log_min_messages)
		beforeSend = GetCurrentTimestamp();

	/*
	 * Submit the command asynchronously.
	 */
576
	if (PQsendGpQuery_shared(dispatchResult->segdbDesc->conn, (char *) query_text, query_text_len, true) == 0)
577
	{
578 579
		char	   *msg = PQerrorMessage(dispatchResult->segdbDesc->conn);

580 581 582 583
		dispatchResult->stillRunning = false;
		ereport(ERROR,
				(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
				 errmsg("Command could not be dispatch to segment %s: %s",
584
						dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error")));
585 586
	}

587 588
	forwardQENotices();

589 590 591 592 593 594 595 596 597
	if (DEBUG1 >= log_min_messages)
	{
		TimestampDifference(beforeSend, GetCurrentTimestamp(), &secs, &usecs);

		if (secs != 0 || usecs > 1000)	/* Time > 1ms? */
			elog(LOG, "time for PQsendGpQuery_shared %ld.%06d", secs, usecs);
	}

	/*
598 599 600
	 * We'll keep monitoring this QE -- whether or not the command was
	 * dispatched -- in order to check for a lost connection or any other
	 * errors that libpq might have in store for us.
601 602 603 604 605 606 607
	 */
	dispatchResult->stillRunning = true;
	dispatchResult->hasDispatched = true;

	ELOG_DISPATCHER_DEBUG("Command dispatched to QE (%s)", dispatchResult->segdbDesc->whoami);
}

608
/*
609 610 611 612 613 614
 * Helper function to checkDispatchResult that handles errors that occur
 * during the poll() call.
 *
 * NOTE: The cleanup of the connections will be performed by handlePollTimeout().
 */
static void
615
handlePollError(CdbDispatchCmdAsync *pParms)
616
{
617
	int			i;
618 619 620 621 622 623 624 625 626 627 628 629 630

	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/* Skip if already finished or didn't dispatch. */
		if (!dispatchResult->stillRunning)
			continue;

		/* We're done with this QE, sadly. */
		if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
		{
631 632
			char	   *msg = PQerrorMessage(segdbDesc->conn);

633 634 635 636 637 638 639
			if (msg)
				elog(LOG, "Dispatcher encountered connection error on %s: %s", segdbDesc->whoami, msg);

			elog(LOG, "Dispatcher noticed bad connection in handlePollError()");

			/* Save error info for later. */
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
640 641 642
										   "Error after dispatch from %s: %s",
										   segdbDesc->whoami,
										   msg ? msg : "unknown error");
643 644 645 646 647 648

			PQfinish(segdbDesc->conn);
			segdbDesc->conn = NULL;
			dispatchResult->stillRunning = false;
		}
	}
649
	forwardQENotices();
650 651 652 653

	return;
}

654 655 656 657
/*
 * Receive and process results from QEs.
 */
static void
658
handlePollSuccess(CdbDispatchCmdAsync *pParms,
659 660
				  struct pollfd *fds)
{
661 662
	int			currentFdNumber = 0;
	int			i = 0;
663 664 665 666 667 668

	/*
	 * We have data waiting on one or more of the connections.
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
669 670
		bool		finished;
		int			sock;
671 672 673 674 675 676 677 678 679 680
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		ELOG_DISPATCHER_DEBUG("looking for results from %d of %d (%s)",
681
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);
682 683 684 685 686 687 688 689 690 691 692 693

		sock = PQsocket(segdbDesc->conn);
		Assert(sock >= 0);
		Assert(sock == fds[currentFdNumber].fd);

		/*
		 * Skip this connection if it has no input available.
		 */
		if (!(fds[currentFdNumber++].revents & POLLIN))
			continue;

		ELOG_DISPATCHER_DEBUG("PQsocket says there are results from %d of %d (%s)",
694
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);
695 696 697 698 699

		/*
		 * Receive and process results from this QE.
		 */
		finished = processResults(dispatchResult);
700

701 702 703 704 705 706 707 708
		/*
		 * Are we through with this QE now?
		 */
		if (finished)
		{
			dispatchResult->stillRunning = false;

			ELOG_DISPATCHER_DEBUG("processResults says we are finished with %d of %d (%s)",
709
								  i + 1, pParms->dispatchCount, segdbDesc->whoami);
710 711 712

			if (DEBUG1 >= log_min_messages)
			{
713 714
				char		msec_str[32];

715 716 717 718 719
				switch (check_log_duration(msec_str, false))
				{
					case 1:
					case 2:
						elog(LOG, "duration to dispatch result received from %d (seg %d): %s ms",
720
							 i + 1, dispatchResult->segdbDesc->segindex, msec_str);
721 722 723 724 725 726 727 728 729
						break;
				}
			}

			if (PQisBusy(dispatchResult->segdbDesc->conn))
				elog(LOG, "We thought we were done, because finished==true, but libpq says we are still busy");
		}
		else
			ELOG_DISPATCHER_DEBUG("processResults says we have more to do with %d of %d (%s)",
730
								  i + 1, pParms->dispatchCount, segdbDesc->whoami);
731 732 733 734 735 736 737
	}
}

/*
 * Send finish or cancel signal to QEs if needed.
 */
static void
738
signalQEs(CdbDispatchCmdAsync *pParms)
739
{
740
	int			i;
741
	DispatchWaitMode waitMode = pParms->waitMode;
742 743 744

	for (i = 0; i < pParms->dispatchCount; i++)
	{
745 746
		char		errbuf[256];
		bool		sent = false;
747
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
748

749 750 751 752
		Assert(dispatchResult != NULL);
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
753 754
		 * Don't send the signal if - QE is finished or canceled - the signal
		 * was already sent - connection is dead
755 756
		 */

757 758 759 760
		if (!dispatchResult->stillRunning ||
			dispatchResult->wasCanceled ||
			cdbconn_isBadConnection(segdbDesc))
			continue;
761

762 763 764 765 766
		memset(errbuf, 0, sizeof(errbuf));

		sent = cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL);
		if (sent)
			dispatchResult->sentSignal = waitMode;
767
		else
768 769
			elog(LOG, "Unable to cancel: %s",
				 strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
770 771 772 773 774 775 776
	}
}

/*
 * Check if any segment DB down is detected by FTS.
 */
static void
777
checkSegmentAlive(CdbDispatchCmdAsync *pParms)
778
{
779
	int			i;
780 781

	/*
A
Ashwin Agrawal 已提交
782
	 * check the connection still valid
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		/*
		 * Skip the entry db.
		 */
		if (segdbDesc->segindex < 0)
			continue;

		ELOG_DISPATCHER_DEBUG("FTS testing connection %d of %d (%s)",
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);

804
		if (FtsIsSegmentDown(segdbDesc->segment_database_info))
805
		{
806 807
			char	   *msg = PQerrorMessage(segdbDesc->conn);

808 809
			dispatchResult->stillRunning = false;
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
810 811
										   "FTS detected connection lost during dispatch to %s: %s",
										   dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error");
812

813
			/*
814 815
			 * Not a good idea to store into the PGconn object. Instead, just
			 * close it.
816 817
			 */
			PQfinish(segdbDesc->conn);
818
			segdbDesc->conn = NULL;
819 820 821 822
		}
	}
}

D
David Kimura 已提交
823 824 825
static inline void
send_sequence_response(PGconn *conn, Oid oid, int64 last, int64 cached, int64 increment, bool overflow, bool error)
{
826 827
	if (pqPutMsgStart(SEQ_NEXTVAL_QUERY_RESPONSE, false, conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
D
David Kimura 已提交
828 829 830 831 832 833 834 835 836
	pqPutInt(oid, 4, conn);
	pqPutInt(last >> 32, 4, conn);
	pqPutInt(last, 4, conn);
	pqPutInt(cached >> 32, 4, conn);
	pqPutInt(cached, 4, conn);
	pqPutInt(increment >> 32, 4, conn);
	pqPutInt(increment, 4, conn);
	pqPutc(overflow ? SEQ_NEXTVAL_TRUE : SEQ_NEXTVAL_FALSE, conn);
	pqPutc(error ? SEQ_NEXTVAL_TRUE : SEQ_NEXTVAL_FALSE, conn);
837 838 839 840
	if (pqPutMsgEnd(conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
	if (pqFlush(conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
D
David Kimura 已提交
841 842
}

843 844 845 846 847 848 849
/*
 * Receive and process input from one QE.
 *
 * Return true if all input are consumed or the connection went wrong.
 * Return false if there'er still more data expected.
 */
static bool
850
processResults(CdbDispatchResult *dispatchResult)
851 852
{
	SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
853
	char	   *msg;
854 855 856 857 858 859 860 861

	/*
	 * Receive input from QE.
	 */
	if (PQconsumeInput(segdbDesc->conn) == 0)
	{
		msg = PQerrorMessage(segdbDesc->conn);
		cdbdisp_appendMessageNonThread(dispatchResult, LOG,
862 863
									   "Error on receive from %s: %s",
									   segdbDesc->whoami, msg ? msg : "unknown error");
864 865
		return true;
	}
866
	forwardQENotices();
867 868 869 870 871 872 873

	/*
	 * If we have received one or more complete messages, process them.
	 */
	while (!PQisBusy(segdbDesc->conn))
	{
		/* loop to call PQgetResult; won't block */
874
		PGresult   *pRes;
875
		ExecStatusType resultStatus;
876
		int			resultIndex;
877

878 879
		forwardQENotices();

880
		/*
881 882 883
		 * PQisBusy() does some error handling, which can cause the connection
		 * to die -- we can't just continue on as if the connection is happy
		 * without checking first.
884
		 *
885 886
		 * For example, cdbdisp_numPGresult() will return a completely bogus
		 * value!
887 888 889 890 891
		 */
		if (cdbconn_isBadConnection(segdbDesc))
		{
			msg = PQerrorMessage(segdbDesc->conn);
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
892 893
										   "Connection lost when receiving from %s: %s",
										   segdbDesc->whoami, msg ? msg : "unknown error");
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
			return true;
		}

		/*
		 * Get one message.
		 */
		ELOG_DISPATCHER_DEBUG("PQgetResult");
		pRes = PQgetResult(segdbDesc->conn);

		/*
		 * Command is complete when PGgetResult() returns NULL. It is critical
		 * that for any connection that had an asynchronous command sent thru
		 * it, we call PQgetResult until it returns NULL. Otherwise, the next
		 * time a command is sent to that connection, it will return an error
		 * that there's a command pending.
		 */
		if (!pRes)
		{
			ELOG_DISPATCHER_DEBUG("%s -> idle", segdbDesc->whoami);
			/* this is normal end of command */
			return true;
		}

X
xiong-gang 已提交
917
		if (segdbDesc->conn->wrote_xlog)
918
		{
919
			MarkTopTransactionWriteXLogOnExecutor();
X
xiong-gang 已提交
920

921 922 923 924 925 926 927 928 929
			/*
			 * Reset the worte_xlog here. Since if the received pgresult not process
			 * the xlog write message('x' message sends from QE in ReadyForQuery),
			 * the value may still refer to previous dispatch statement. Which may
			 * always mark current top transaction has wrote xlog on executor.
			 */
			segdbDesc->conn->wrote_xlog = false;
		}

930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
		/*
		 * Attach the PGresult object to the CdbDispatchResult object.
		 */
		resultIndex = cdbdisp_numPGresult(dispatchResult);
		cdbdisp_appendResult(dispatchResult, pRes);

		/*
		 * Did a command complete successfully?
		 */
		resultStatus = PQresultStatus(pRes);
		if (resultStatus == PGRES_COMMAND_OK ||
			resultStatus == PGRES_TUPLES_OK ||
			resultStatus == PGRES_COPY_IN ||
			resultStatus == PGRES_COPY_OUT ||
			resultStatus == PGRES_EMPTY_QUERY)
		{
			ELOG_DISPATCHER_DEBUG("%s -> ok %s",
947 948
								  segdbDesc->whoami,
								  PQcmdStatus(pRes) ? PQcmdStatus(pRes) : "(no cmdStatus)");
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964

			if (resultStatus == PGRES_EMPTY_QUERY)
				ELOG_DISPATCHER_DEBUG("QE received empty query.");

			/*
			 * Save the index of the last successful PGresult. Can be given to
			 * cdbdisp_getPGresult() to get tuple count, etc.
			 */
			dispatchResult->okindex = resultIndex;

			/*
			 * SREH - get number of rows rejected from QE if any
			 */
			if (pRes->numRejected > 0)
				dispatchResult->numrowsrejected += pRes->numRejected;

965
			/*
966 967
			 * COPY FROM ON SEGMENT - get the number of rows completed by QE
			 * if any
968 969 970 971
			 */
			if (pRes->numCompleted > 0)
				dispatchResult->numrowscompleted += pRes->numCompleted;

972 973 974 975
			if (resultStatus == PGRES_COPY_IN ||
				resultStatus == PGRES_COPY_OUT)
				return true;
		}
976

977 978 979 980 981 982 983 984 985 986 987 988
		/*
		 * Note QE error. Cancel the whole statement if requested.
		 */
		else
		{
			/* QE reported an error */
			char	   *sqlstate = PQresultErrorField(pRes, PG_DIAG_SQLSTATE);
			int			errcode = 0;

			msg = PQresultErrorMessage(pRes);

			ELOG_DISPATCHER_DEBUG("%s -> %s %s  %s",
989 990 991 992
								  segdbDesc->whoami,
								  PQresStatus(resultStatus),
								  sqlstate ? sqlstate : "(no SQLSTATE)",
								  msg);
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008

			/*
			 * Convert SQLSTATE to an error code (ERRCODE_xxx). Use a generic
			 * nonzero error code if no SQLSTATE.
			 */
			if (sqlstate && strlen(sqlstate) == 5)
				errcode = sqlstate_to_errcode(sqlstate);

			/*
			 * Save first error code and the index of its PGresult buffer
			 * entry.
			 */
			cdbdisp_seterrcode(errcode, resultIndex, dispatchResult);
		}
	}

1009 1010
	forwardQENotices();

D
David Kimura 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
	/*
	 * If there was nextval request then respond back on this libpq connection
	 * with the next value. Check and process nextval message only if QD has not
	 * already hit the error. Since QD could have hit the error while processing
	 * the previous nextval_qd() request itself and since full error handling is
	 * not complete yet like releasing all the locks, etc.., shouldn't attempt
	 * to call nextval_qd() again.
	 */
	PGnotify *nextval = PQnotifies(segdbDesc->conn);
	if ((elog_geterrcode() == 0) && nextval &&
		strcmp(nextval->relname, "nextval") == 0)
	{
		int64 last;
		int64 cached;
		int64 increment;
		bool overflow;
		int dbid;
		int seq_oid;

		if (sscanf(nextval->extra, "%d:%d", &dbid, &seq_oid) != 2)
			elog(ERROR, "invalid nextval message");

		if (dbid != MyDatabaseId)
			elog(ERROR, "nextval message database id:%d doesn't match my database id:%d",
				 dbid, MyDatabaseId);

		PG_TRY();
		{
			nextval_qd(seq_oid, &last, &cached, &increment, &overflow);
		}
		PG_CATCH();
		{
			send_sequence_response(segdbDesc->conn, seq_oid, last, cached, increment, overflow, true /* error */);
			PG_RE_THROW();
		}
		PG_END_TRY();
		/*
		 * respond back on this libpq connection with the next value
		 */
		send_sequence_response(segdbDesc->conn, seq_oid, last, cached, increment, overflow, false /* error */);
	}
	if (nextval)
		PQfreemem(nextval);

1055 1056
	forwardQENotices();

1057
	return false;				/* we must keep on monitoring this socket */
1058
}