diff --git a/src/backend/cdb/cdbcopy.c b/src/backend/cdb/cdbcopy.c index 1bc2b6ebb6ac1962621b539d3e1d5ed20aad3001..8e879ec06073a8fdaca34f86f259348d7a2994e5 100644 --- a/src/backend/cdb/cdbcopy.c +++ b/src/backend/cdb/cdbcopy.c @@ -52,7 +52,6 @@ CdbCopy * makeCdbCopy(bool is_copy_in) { CdbCopy *c; - int seg; c = palloc0(sizeof(CdbCopy)); @@ -68,22 +67,12 @@ makeCdbCopy(bool is_copy_in) c->hasReplicatedTable = false; c->dispatcherState = NULL; initStringInfo(&(c->err_msg)); - initStringInfo(&(c->err_context)); initStringInfo(&(c->copy_out_buf)); /* init total_segs */ c->total_segs = getgpsegmentCount(); c->aotupcounts = NULL; - /* Initialize the state of each segment database */ - c->segdb_state = (SegDbState **) palloc((c->total_segs) * sizeof(SegDbState *)); - - for (seg = 0; seg < c->total_segs; seg++) - { - c->segdb_state[seg] = (SegDbState *) palloc(2 * sizeof(SegDbState)); - c->segdb_state[seg][0] = SEGDB_IDLE; /* Primary can't be OUT */ - } - /* init seg list for copy out */ if (!c->copy_in) { @@ -105,7 +94,7 @@ makeCdbCopy(bool is_copy_in) void cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy) { - int seg; + int flags; /* clean err message */ c->err_msg.len = 0; @@ -132,16 +121,13 @@ cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy) GP_POLICY_ALL_NUMSEGMENTS); } - CdbDispatchCopyStart(c, (Node *)stmt, - (c->copy_in ? DF_NEED_TWO_PHASE | DF_WITH_SNAPSHOT : DF_WITH_SNAPSHOT) | DF_CANCEL_ON_ERROR); + flags = DF_WITH_SNAPSHOT | DF_CANCEL_ON_ERROR; + if (c->copy_in) + flags |= DF_NEED_TWO_PHASE; - SIMPLE_FAULT_INJECTOR(CdbCopyStartAfterDispatch); + CdbDispatchCopyStart(c, (Node *) stmt, flags); - /* fill in CdbCopy structure */ - for (seg = 0; seg < c->total_segs; seg++) - { - c->segdb_state[seg][0] = SEGDB_COPY; /* we be jammin! */ - } + SIMPLE_FAULT_INJECTOR(CdbCopyStartAfterDispatch); } /* @@ -392,26 +378,22 @@ processCopyEndResults(CdbCopy *c, SegmentDatabaseDescriptor **db_descriptors, int *results, int size, - SegmentDatabaseDescriptor **failedSegDBs, bool *err_header, int *failed_count, int *total_rows_rejected, int64 *total_rows_completed) { - SegmentDatabaseDescriptor *q; int seg; - PGresult *res; struct pollfd *pollRead = (struct pollfd *) palloc(sizeof(struct pollfd)); - int segment_rows_rejected = 0; /* num of rows rejected by this QE */ - int segment_rows_completed = 0; /* num of rows completed by this - * QE */ ErrorData *first_error = NULL; for (seg = 0; seg < size; seg++) { + SegmentDatabaseDescriptor *q = db_descriptors[seg]; int result = results[seg]; - - q = db_descriptors[seg]; + int64 segment_rows_rejected = 0; /* num of rows rejected by this QE */ + int64 segment_rows_completed = 0; /* num of rows completed by this QE */ + PGresult *res; /* get command end status */ if (result == 0) @@ -554,27 +536,17 @@ processCopyEndResults(CdbCopy *c, } RESUME_INTERRUPTS(); - /* Finished with this segment db. */ - c->segdb_state[seg][0] = SEGDB_DONE; - /* - * add number of rows rejected from this segment to the total of - * rejected rows. Only count from primary segs. + * Finished with this segment db. + * + * Add the number of rows completed and rejected from this segment + * to the totals. Only count from primary segs. */ if (segment_rows_rejected > 0) *total_rows_rejected += segment_rows_rejected; - - segment_rows_rejected = 0; - - /* - * add number of rows completed from this segment to the total of - * completed rows. Only count from primary segs - */ if ((NULL != total_rows_completed) && (segment_rows_completed > 0)) *total_rows_completed += segment_rows_completed; - segment_rows_completed = 0; - /* Lost the connection? */ if (PQstatus(q->conn) == CONNECTION_BAD) { @@ -593,7 +565,6 @@ processCopyEndResults(CdbCopy *c, q->conn = NULL; /* Let FTS deal with it! */ - failedSegDBs[*failed_count] = q; (*failed_count)++; } } @@ -619,7 +590,6 @@ int cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort_msg) { SegmentDatabaseDescriptor *q; - SegmentDatabaseDescriptor **failedSegDBs; Gang *gp; int *results; /* final result of COPY command execution */ int seg; @@ -649,9 +619,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort c->err_msg.data[0] = '\0'; c->err_msg.cursor = 0; - /* allocate a failed segment database pointer array */ - failedSegDBs = (SegmentDatabaseDescriptor **) palloc(c->total_segs * 2 * sizeof(SegmentDatabaseDescriptor *)); - db_descriptors = gp->db_descriptors; size = gp->size; @@ -670,7 +637,7 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort *total_rows_completed = 0; edata = processCopyEndResults(c, db_descriptors, results, size, - failedSegDBs, &err_header, + &err_header, &failed_count, &total_rows_rejected, total_rows_completed); @@ -689,7 +656,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort } pfree(results); - pfree(failedSegDBs); /* If we are aborting the COPY, ignore errors sent by the server. */ if (edata && !abort_msg) diff --git a/src/include/cdb/cdbcopy.h b/src/include/cdb/cdbcopy.h index 9bfac2160df8b5b5731ceaee3ec215d868bbd4e7..814f86479a00585ffa0c13292cee6a0f65c46135 100644 --- a/src/include/cdb/cdbcopy.h +++ b/src/include/cdb/cdbcopy.h @@ -25,18 +25,6 @@ struct CdbDispatcherState; -typedef enum SegDbState -{ - /* - * (it is best to avoid names like OUT that are likely to be #define'd or - * typedef'd in some platform-dependent runtime library header file) - */ - SEGDB_OUT, /* Not participating in COPY (invalid etc...) */ - SEGDB_IDLE, /* Participating but COPY not yet started */ - SEGDB_COPY, /* COPY in progress */ - SEGDB_DONE /* COPY completed (with or without errors) */ -} SegDbState; - typedef struct CdbCopy { int total_segs; /* total number of segments in cdb */ @@ -46,10 +34,7 @@ typedef struct CdbCopy * communicate with segDB's */ bool skip_ext_partition;/* skip external partition */ - SegDbState **segdb_state; - StringInfoData err_msg; /* error message for cdbcopy operations */ - StringInfoData err_context; /* error context from QE error */ StringInfoData copy_out_buf;/* holds a chunk of data from the database */ List *outseglist; /* segs that currently take part in copy out.