cdbcopy.c 17.6 KB
Newer Older
1
/*--------------------------------------------------------------------------
2 3
 *
 * cdbcopy.c
A
Asim R P 已提交
4
 * 	 Provides routines that executed a COPY command on an MPP cluster. These
5 6 7 8 9 10 11 12 13
 * 	 routines are called from the backend COPY command whenever MPP is in the
 * 	 default dispatch mode.
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbcopy.c
14 15 16 17 18 19
*
*--------------------------------------------------------------------------
*/

#include "postgres.h"
#include "miscadmin.h"
20 21
#include "libpq-fe.h"
#include "libpq-int.h"
22 23
#include "cdb/cdbconn.h"
#include "cdb/cdbcopy.h"
24
#include "cdb/cdbdisp_query.h"
25 26 27 28 29 30
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
A
Ashwin Agrawal 已提交
31
#include "storage/pmsignal.h"
32 33 34 35
#include "tcop/tcopprot.h"
#include "utils/faultinjector.h"
#include "utils/memutils.h"

36 37
#include <poll.h>

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
/*
 * Create a cdbCopy object that includes all the cdb
 * information and state needed by the backend COPY.
 */
CdbCopy *
makeCdbCopy(bool is_copy_in)
{
	CdbCopy    *c;
	int			seg;

	c = palloc(sizeof(CdbCopy));

	/* fresh start */
	c->total_segs = 0;
	c->primary_writer = NULL;
	c->io_errors = false;
	c->copy_in = is_copy_in;
	c->skip_ext_partition = false;
	c->outseglist = NIL;
	c->partitions = NULL;
	c->ao_segnos = NIL;
P
Pengzhou Tang 已提交
59
	c->hasReplicatedTable = false;
60 61
	initStringInfo(&(c->err_msg));
	initStringInfo(&(c->err_context));
A
Ashwin Agrawal 已提交
62 63
	initStringInfo(&(c->copy_out_buf));

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
	/* init total_segs */
	c->total_segs = getgpsegmentCount();
	c->aotupcounts = NULL;

	Assert(c->total_segs >= 0);

	/* Initialize the state of each segment database */
	c->segdb_state = (SegDbState **) palloc((c->total_segs) * sizeof(SegDbState *));

	for (seg = 0; seg < c->total_segs; seg++)
	{
		c->segdb_state[seg] = (SegDbState *) palloc(2 * sizeof(SegDbState));
		c->segdb_state[seg][0] = SEGDB_IDLE;	/* Primary can't be OUT */
	}

	/* init gangs */
80
	c->primary_writer = AllocateWriterGang();
A
Ashwin Agrawal 已提交
81

82 83 84
	/* init seg list for copy out */
	if (!c->copy_in)
	{
A
Ashwin Agrawal 已提交
85 86
		int			i;

87 88 89 90 91 92 93 94 95 96
		for (i = 0; i < c->total_segs; i++)
			c->outseglist = lappend_int(c->outseglist, i);
	}

	return c;
}


/*
 * starts a copy command on a specific segment database.
A
Ashwin Agrawal 已提交
97
 *
98 99 100
 * may pg_throw via elog/ereport.
 */
void
A
Asim R P 已提交
101
cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy)
102 103
{
	int			seg;
A
Ashwin Agrawal 已提交
104

105 106 107 108
	/* clean err message */
	c->err_msg.len = 0;
	c->err_msg.data[0] = '\0';
	c->err_msg.cursor = 0;
A
Ashwin Agrawal 已提交
109

A
Asim R P 已提交
110
	stmt = copyObject(stmt);
111 112

	/* add in partitions for dispatch */
A
Asim R P 已提交
113
	stmt->partitions = c->partitions;
A
Ashwin Agrawal 已提交
114

115
	/* add in AO segno map for dispatch */
A
Asim R P 已提交
116
	stmt->ao_segnos = c->ao_segnos;
117

A
Asim R P 已提交
118
	stmt->skip_ext_partition = c->skip_ext_partition;
119 120 121

	if (policy)
	{
A
Asim R P 已提交
122
		stmt->policy = GpPolicyCopy(CurrentMemoryContext, policy);
123 124 125
	}
	else
	{
A
Asim R P 已提交
126
		stmt->policy = createRandomPartitionedPolicy(NULL);
127 128
	}

A
Asim R P 已提交
129
	CdbDispatchUtilityStatement((Node *) stmt,
130
								(c->copy_in ? DF_NEED_TWO_PHASE | DF_WITH_SNAPSHOT : DF_WITH_SNAPSHOT) | DF_CANCEL_ON_ERROR,
A
Asim R P 已提交
131
								NIL,
132
								NULL);
133 134 135 136 137 138 139 140 141 142

	SIMPLE_FAULT_INJECTOR(CdbCopyStartAfterDispatch);

	/* fill in CdbCopy structure */
	for (seg = 0; seg < c->total_segs; seg++)
	{
		c->segdb_state[seg][0] = SEGDB_COPY;	/* we be jammin! */
	}
}

A
alldefector 已提交
143 144 145 146 147 148
/*
 * sends data to a copy command on all segments.
 */
void
cdbCopySendDataToAll(CdbCopy *c, const char *buffer, int nbytes)
{
A
Ashwin Agrawal 已提交
149 150 151 152 153 154
	Gang	   *gp = c->primary_writer;

	for (int i = 0; i < gp->size; ++i)
	{
		int			seg = gp->db_descriptors[i].segindex;

A
alldefector 已提交
155 156 157 158
		cdbCopySendData(c, seg, buffer, nbytes);
	}
}

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
/*
 * sends data to a copy command on a specific segment (usually
 * the hash result of the data value).
 */
void
cdbCopySendData(CdbCopy *c, int target_seg, const char *buffer,
				int nbytes)
{
	SegmentDatabaseDescriptor *q;
	Gang	   *gp;
	int			result;

	/* clean err message */
	c->err_msg.len = 0;
	c->err_msg.data[0] = '\0';
	c->err_msg.cursor = 0;
A
Ashwin Agrawal 已提交
175 176 177 178 179

	/*
	 * NOTE!! note that another DELIM was added, for the buf_converted in the
	 * code above. I didn't do it because it's broken right now
	 */
180 181 182 183

	gp = c->primary_writer;

	q = getSegmentDescriptorFromGang(gp, target_seg);
A
Ashwin Agrawal 已提交
184

185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	/* transmit the COPY data */
	result = PQputCopyData(q->conn, buffer, nbytes);

	if (result != 1)
	{
		if (result == 0)
			appendStringInfo(&(c->err_msg),
							 "Failed to send data to segment %d, attempt blocked\n",
							 target_seg);

		if (result == -1)
			appendStringInfo(&(c->err_msg),
							 "Failed to send data to segment %d: %s\n",
							 target_seg, PQerrorMessage(q->conn));

		c->io_errors = true;
	}
}

/*
 * gets a chunk of rows of data from a copy command.
 * returns boolean true if done. Caller should still
 * empty the leftovers in the outbuf in that case.
 */
A
Ashwin Agrawal 已提交
209 210
bool
cdbCopyGetData(CdbCopy *c, bool copy_cancel, uint64 *rows_processed)
211 212
{
	SegmentDatabaseDescriptor *q;
A
Ashwin Agrawal 已提交
213
	Gang	   *gp;
214 215
	PGresult   *res;
	int			nbytes;
A
Ashwin Agrawal 已提交
216
	bool		first_error = true;
217 218 219 220 221 222 223 224 225 226

	/* clean err message */
	c->err_msg.len = 0;
	c->err_msg.data[0] = '\0';
	c->err_msg.cursor = 0;

	/* clean out buf data */
	c->copy_out_buf.len = 0;
	c->copy_out_buf.data[0] = '\0';
	c->copy_out_buf.cursor = 0;
A
Ashwin Agrawal 已提交
227

228 229 230
	gp = c->primary_writer;

	/*
A
Ashwin Agrawal 已提交
231 232 233 234
	 * MPP-7712: we used to issue the cancel-requests for each *row* we got
	 * back from each segment -- this is potentially millions of
	 * cancel-requests. Cancel requests consist of an out-of-band connection
	 * to the segment-postmaster, this is *not* a lightweight operation!
235 236 237 238
	 */
	if (copy_cancel)
	{
		ListCell   *cur;
A
Ashwin Agrawal 已提交
239

240 241 242 243 244 245 246 247 248 249 250 251 252
		/* iterate through all the segments that still have data to give */
		foreach(cur, c->outseglist)
		{
			int			source_seg = lfirst_int(cur);

			q = getSegmentDescriptorFromGang(gp, source_seg);

			/* send a query cancel request to that segdb */
			PQrequestCancel(q->conn);
		}
	}

	/*
A
Ashwin Agrawal 已提交
253 254
	 * Collect data rows from the segments that still have rows to give until
	 * chunk minimum size is reached
255 256 257 258
	 */
	while (c->copy_out_buf.len < COPYOUT_CHUNK_SIZE)
	{
		ListCell   *cur;
A
Ashwin Agrawal 已提交
259

260 261 262 263
		/* iterate through all the segments that still have data to give */
		foreach(cur, c->outseglist)
		{
			int			source_seg = lfirst_int(cur);
A
Ashwin Agrawal 已提交
264
			char	   *buffer;
265 266 267 268 269 270

			q = getSegmentDescriptorFromGang(gp, source_seg);

			/* get 1 row of COPY data */
			nbytes = PQgetCopyData(q->conn, &buffer, false);

A
Ashwin Agrawal 已提交
271 272
			/*
			 * SUCCESS -- got a row of data
273
			 */
A
Ashwin Agrawal 已提交
274
			if (nbytes > 0 && buffer)
275 276 277 278 279 280 281 282 283 284
			{
				/* append the data row to the data chunk */
				appendBinaryStringInfo(&(c->copy_out_buf), buffer, nbytes);

				/* increment the rows processed counter for the end tag */
				(*rows_processed)++;

				PQfreemem(buffer);
			}

A
Ashwin Agrawal 已提交
285 286 287
			/*
			 * DONE -- Got all the data rows from this segment, or a cancel
			 * request.
288 289 290 291
			 */
			else if (nbytes == -1)
			{
				/*
A
Ashwin Agrawal 已提交
292 293
				 * Fetch any error status existing on completion of the COPY
				 * command.
294 295 296 297
				 */
				while (NULL != (res = PQgetResult(q->conn)))
				{
					/* if the COPY command had an error */
A
Adam Lee 已提交
298
					if (PQresultStatus(res) == PGRES_FATAL_ERROR && first_error)
299 300 301
					{
						appendStringInfo(&(c->err_msg), "Error from segment %d: %s\n",
										 source_seg, PQresultErrorMessage(res));
A
Adam Lee 已提交
302
						first_error = false;
303 304
					}

305 306 307 308 309
					if (res->numCompleted > 0)
					{
						*rows_processed += res->numCompleted;
					}

310 311 312 313
					/* free the PGresult object */
					PQclear(res);
				}

A
Ashwin Agrawal 已提交
314 315 316 317
				/*
				 * remove the segment that completed sending data from the
				 * list
				 */
318 319 320 321
				c->outseglist = list_delete_int(c->outseglist, source_seg);

				/* no more segments with data on the list. we are done */
				if (list_length(c->outseglist) == 0)
A
Ashwin Agrawal 已提交
322
					return true;	/* done */
323 324 325 326 327

				/* start over from first seg as we just changes the seg list */
				break;
			}

A
Ashwin Agrawal 已提交
328
			/*
329 330 331 332
			 * ERROR!
			 */
			else
			{
A
Ashwin Agrawal 已提交
333 334 335 336
				/*
				 * should never happen since we are blocking. Don't bother to
				 * try again, exit with error.
				 */
337 338 339 340 341 342 343
				if (nbytes == 0)
					appendStringInfo(&(c->err_msg),
									 "Failed to get data from segment %d, attempt blocked\n",
									 source_seg);

				if (nbytes == -2)
				{
A
Asim R P 已提交
344 345 346 347 348
					/* GPDB_91_MERGE_FIXME: How should we handle errors here? We used
					 * to append them to err_msg, but that doesn't seem right. Surely
					 * we should ereport()? I put in just a quick elog() for now..
					 */
					elog(ERROR, "could not dispatch COPY: %s", PQerrorMessage(q->conn));
349 350 351 352
					appendStringInfo(&(c->err_msg),
									 "Failed to get data from segment %d: %s\n",
									 source_seg, PQerrorMessage(q->conn));

A
Ashwin Agrawal 已提交
353 354 355 356
					/*
					 * remove the segment that completed sending data from the
					 * list
					 */
357 358 359 360
					c->outseglist = list_delete_int(c->outseglist, source_seg);

					/* no more segments with data on the list. we are done */
					if (list_length(c->outseglist) == 0)
A
Ashwin Agrawal 已提交
361
						return true;	/* done */
362

A
Ashwin Agrawal 已提交
363 364 365 366
					/*
					 * start over from first seg as we just changes the seg
					 * list
					 */
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
					break;
				}

				c->io_errors = true;
			}
		}

		if (c->copy_out_buf.len > COPYOUT_CHUNK_SIZE)
			break;
	}

	return false;
}

/*
 * Process the results from segments after sending the end of copy command.
 */
A
Asim R P 已提交
384
static ErrorData *
385 386 387 388 389 390 391
processCopyEndResults(CdbCopy *c,
					  SegmentDatabaseDescriptor *db_descriptors,
					  int *results,
					  int size,
					  SegmentDatabaseDescriptor **failedSegDBs,
					  bool *err_header,
					  int *failed_count,
392
					  int *total_rows_rejected,
393
					  int64 *total_rows_completed)
394 395
{
	SegmentDatabaseDescriptor *q;
A
Ashwin Agrawal 已提交
396 397
	int			seg;
	PGresult   *res;
398
	struct pollfd	*pollRead = (struct pollfd *) palloc(sizeof(struct pollfd));
A
Ashwin Agrawal 已提交
399 400 401
	int			segment_rows_rejected = 0;	/* num of rows rejected by this QE */
	int			segment_rows_completed = 0; /* num of rows completed by this
											 * QE */
A
Asim R P 已提交
402
	ErrorData *first_error = NULL;
403

A
Ashwin Agrawal 已提交
404
	for (seg = 0; seg < size; seg++)
405
	{
A
Ashwin Agrawal 已提交
406 407
		int			result = results[seg];

408
		q = &db_descriptors[seg];
A
Ashwin Agrawal 已提交
409

410 411 412 413 414 415
		/* get command end status */
		if (result == 0)
		{
			/* attempt blocked */

			/*
A
Ashwin Agrawal 已提交
416 417 418 419
			 * CDB TODO: Can this occur?  The libpq documentation says, "this
			 * case is only possible if the connection is in nonblocking
			 * mode... wait for write-ready and try again", i.e., the proper
			 * response would be to retry, not error out.
420 421 422 423 424 425
			 */
			if (!(*err_header))
				appendStringInfo(&(c->err_msg),
								 "Failed to complete COPY on the following:\n");
			*err_header = true;

A
Ashwin Agrawal 已提交
426
			appendStringInfo(&(c->err_msg), "primary segment %d, dbid %d, attempt blocked\n",
427 428 429 430
							 seg, q->segment_database_info->dbid);
			c->io_errors = true;
		}

431 432 433 434
		pollRead->fd = PQsocket(q->conn);
		pollRead->events = POLLIN;
		pollRead->revents = 0;

435
		while (PQisBusy(q->conn) && PQstatus(q->conn) == CONNECTION_OK)
436 437 438 439 440 441 442 443 444 445 446 447
		{
			if ((Gp_role == GP_ROLE_DISPATCH) && InterruptPending)
			{
				PQrequestCancel(q->conn);
			}

			if (poll(pollRead, 1, 200) > 0)
			{
				break;
			}
		}

448 449 450 451
		/*
		 * Fetch any error status existing on completion of the COPY command.
		 * It is critical that for any connection that had an asynchronous
		 * command sent thru it, we call PQgetResult until it returns NULL.
A
Ashwin Agrawal 已提交
452 453
		 * Otherwise, the next time a command is sent to that connection, it
		 * will return an error that there's a command pending.
454
		 */
455
		HOLD_INTERRUPTS();
456 457
		while ((res = PQgetResult(q->conn)) != NULL && PQstatus(q->conn) != CONNECTION_BAD)
		{
A
Ashwin Agrawal 已提交
458
			elog(DEBUG1, "PQgetResult got status %d seg %d    ",
459 460 461 462 463 464 465 466 467 468 469 470 471 472
				 PQresultStatus(res), q->segindex);
			/* if the COPY command had a data error */
			if (PQresultStatus(res) == PGRES_FATAL_ERROR)
			{
				/*
				 * Always append error from the primary. Append error from
				 * mirror only if its primary didn't have an error.
				 *
				 * For now, we only report the first error we get from the
				 * QE's.
				 *
				 * We get the error message in pieces so that we could append
				 * whoami to the primary error message only.
				 */
A
Asim R P 已提交
473 474
				if (!first_error)
					first_error = cdbdisp_get_PQerror(res);
475 476 477 478 479
			}

			/*
			 * If we are still in copy mode, tell QE to stop it.  COPY_IN
			 * protocol has a way to say 'end of copy' but COPY_OUT doesn't.
A
Ashwin Agrawal 已提交
480 481
			 * We have no option but sending cancel message and consume the
			 * output until the state transition to non-COPY.
482 483 484 485 486 487 488 489 490
			 */
			if (PQresultStatus(res) == PGRES_COPY_IN)
			{
				elog(LOG, "Segment still in copy in, retrying the putCopyEnd");
				result = PQputCopyEnd(q->conn, NULL);
			}
			else if (PQresultStatus(res) == PGRES_COPY_OUT)
			{
				char	   *buffer = NULL;
D
Daniel Gustafsson 已提交
491
				int			ret;
492 493

				elog(LOG, "Segment still in copy out, canceling QE");
A
Ashwin Agrawal 已提交
494

495
				/*
A
Ashwin Agrawal 已提交
496 497 498 499 500 501 502
				 * I'm a bit worried about sending a cancel, as if this is a
				 * success case the QE gets inconsistent state than QD.  But
				 * this code path is mostly for error handling and in a
				 * success case we wouldn't see COPY_OUT here. It's not clear
				 * what to do if this cancel failed, since this is not a path
				 * we can error out.  FATAL maybe the way, but I leave it for
				 * now.
503 504 505
				 */
				PQrequestCancel(q->conn);

D
Daniel Gustafsson 已提交
506 507 508
				/*
				 * Need to consume data from the QE until cancellation is
				 * recognized. PQgetCopyData() returns -1 when the COPY is
A
Ashwin Agrawal 已提交
509 510
				 * done, a non-zero result indicates data was returned and in
				 * that case we'll drop it immediately since we aren't
D
Daniel Gustafsson 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
				 * interested in the contents.
				 */
				while ((ret = PQgetCopyData(q->conn, &buffer, false)) != -1)
				{
					if (ret > 0)
					{
						if (buffer)
							PQfreemem(buffer);
						continue;
					}

					/* An error occurred, log the error and break out */
					if (ret == -2)
					{
						ereport(WARNING,
								(errmsg("Error during cancellation: \"%s\"",
A
Ashwin Agrawal 已提交
527
										PQerrorMessage(q->conn))));
D
Daniel Gustafsson 已提交
528 529 530
						break;
					}
				}
531 532 533 534 535 536
			}

			/* in SREH mode, check if this seg rejected (how many) rows */
			if (res->numRejected > 0)
				segment_rows_rejected = res->numRejected;

537 538 539 540 541 542 543
			/*
			 * When COPY FROM ON SEGMENT, need to calculate the number of this
			 * segment's completed rows
			 */
			if (res->numCompleted > 0)
				segment_rows_completed = res->numCompleted;

544
			/* Get AO tuple counts */
545
			c->aotupcounts = PQprocessAoTupCounts(c->partitions, c->aotupcounts, res->aotupcounts, res->naotupcounts);
546 547 548
			/* free the PGresult object */
			PQclear(res);
		}
549
		RESUME_INTERRUPTS();
550 551 552 553

		/* Finished with this segment db. */
		c->segdb_state[seg][0] = SEGDB_DONE;

A
Ashwin Agrawal 已提交
554 555 556
		/*
		 * add number of rows rejected from this segment to the total of
		 * rejected rows. Only count from primary segs.
557 558 559
		 */
		if (segment_rows_rejected > 0)
			*total_rows_rejected += segment_rows_rejected;
A
Ashwin Agrawal 已提交
560

561
		segment_rows_rejected = 0;
A
Ashwin Agrawal 已提交
562

563
		/*
A
Ashwin Agrawal 已提交
564 565
		 * add number of rows completed from this segment to the total of
		 * completed rows. Only count from primary segs
566 567 568 569 570 571
		 */
		if ((NULL != total_rows_completed) && (segment_rows_completed > 0))
			*total_rows_completed += segment_rows_completed;

		segment_rows_completed = 0;

572 573 574 575 576 577 578
		/* Lost the connection? */
		if (PQstatus(q->conn) == CONNECTION_BAD)
		{
			if (!*(err_header))
				appendStringInfo(&(c->err_msg),
								 "ERROR - Failed to complete COPY on the following:\n");
			*err_header = true;
A
Ashwin Agrawal 已提交
579

580 581 582 583
			/* command error */
			c->io_errors = true;
			appendStringInfo(&(c->err_msg), "Primary segment %d, dbid %d, with error: %s\n",
							 seg, q->segment_database_info->dbid, PQerrorMessage(q->conn));
A
Ashwin Agrawal 已提交
584

585 586 587
			/* Free the PGconn object. */
			PQfinish(q->conn);
			q->conn = NULL;
A
Ashwin Agrawal 已提交
588

589 590 591 592 593
			/* Let FTS deal with it! */
			failedSegDBs[*failed_count] = q;
			(*failed_count)++;
		}
	}
A
Asim R P 已提交
594 595

	return first_error;
596 597
}

A
Asim R P 已提交
598 599 600 601 602
int
cdbCopyAbort(CdbCopy *c)
{
	return cdbCopyEndAndFetchRejectNum(c, NULL,
									   "aborting COPY in QE due to error in QD");
603 604 605 606 607
}

/*
 * End the copy command on all segment databases,
 * and fetch the total number of rows completed by all QEs
A
Asim R P 已提交
608 609 610
 * 
 * GPDB_91_MERGE_FIXME: we allow % value to be specified as segment reject
 * limit, however, the total rejected rows is not allowed to be > INT_MAX.
611 612
 */
int
A
Asim R P 已提交
613
cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort_msg)
614 615 616 617
{
	SegmentDatabaseDescriptor *q;
	SegmentDatabaseDescriptor **failedSegDBs;
	Gang	   *gp;
A
Ashwin Agrawal 已提交
618
	int		   *results;		/* final result of COPY command execution */
619 620 621
	int			seg;

	int			failed_count = 0;
A
Ashwin Agrawal 已提交
622 623
	int			total_rows_rejected = 0;	/* total num rows rejected by all
											 * QEs */
624
	bool		err_header = false;
A
Ashwin Agrawal 已提交
625 626
	struct SegmentDatabaseDescriptor *db_descriptors;
	int			size;
A
Asim R P 已提交
627 628 629 630 631 632 633 634 635 636 637 638
	ErrorData *edata;

	/*
	 * Don't try to end a copy that already ended with the destruction of the
	 * writer gang. We know that this has happened if the CdbCopy's
	 * primary_writer is NULL.
	 *
	 * GPDB_91_MERGE_FIXME: ugh, this is nasty. We shouldn't be calling
	 * cdbCopyEnd twice on the same CdbCopy in the first place!
	 */
	if (!c->primary_writer)
		return -1;
639 640 641 642 643 644 645 646 647 648 649 650 651 652

	/* clean err message */
	c->err_msg.len = 0;
	c->err_msg.data[0] = '\0';
	c->err_msg.cursor = 0;

	/* allocate a failed segment database pointer array */
	failedSegDBs = (SegmentDatabaseDescriptor **) palloc(c->total_segs * 2 * sizeof(SegmentDatabaseDescriptor *));

	gp = c->primary_writer;
	db_descriptors = gp->db_descriptors;
	size = gp->size;

	/* results from each segment */
A
Ashwin Agrawal 已提交
653
	results = (int *) palloc0(sizeof(int) * size);
654 655 656 657

	for (seg = 0; seg < size; seg++)
	{
		q = &db_descriptors[seg];
A
Ashwin Agrawal 已提交
658
		elog(DEBUG1, "PQputCopyEnd seg %d    ", q->segindex);
659
		/* end this COPY command */
A
Asim R P 已提交
660
		results[seg] = PQputCopyEnd(q->conn, abort_msg);
661 662
	}

663 664 665
	if (NULL != total_rows_completed)
		*total_rows_completed = 0;

A
Asim R P 已提交
666 667 668 669
	edata = processCopyEndResults(c, db_descriptors, results, size,
								  failedSegDBs, &err_header,
								  &failed_count, &total_rows_rejected,
								  total_rows_completed);
670 671 672 673 674

	/* If lost contact with segment db, try to reconnect. */
	if (failed_count > 0)
	{
		elog(LOG, "%s", c->err_msg.data);
A
Ashwin Agrawal 已提交
675 676 677
		elog(LOG, "COPY signals FTS to probe segments");
		SendPostmasterSignal(PMSIGNAL_WAKEN_FTS);
		DisconnectAndDestroyAllGangs(true);
A
Asim R P 已提交
678
		c->primary_writer = NULL;
A
Ashwin Agrawal 已提交
679 680 681
		ereport(ERROR,
				(errmsg_internal("MPP detected %d segment failures, system is reconnected", failed_count),
				 errSendAlert(true)));
682 683 684 685 686
	}

	pfree(results);
	pfree(failedSegDBs);

A
Asim R P 已提交
687 688 689 690
	/* If we are aborting the COPY, ignore errors sent by the server. */
	if (edata && !abort_msg)
		ReThrowError(edata);

691 692
	return total_rows_rejected;
}