提交 976b0f01 编写于 作者: H Heikki Linnakangas

Cleanups in cdbcopy.c

* 'segdb_state' and 'err_context' fields in CdbCopy were unused, remove.

* 'failedSegDBs' in processCopEndResults was unused, remove.

* Plus some other cosmetic cleanup, for better readability.
上级 dc78e56c
...@@ -52,7 +52,6 @@ CdbCopy * ...@@ -52,7 +52,6 @@ CdbCopy *
makeCdbCopy(bool is_copy_in) makeCdbCopy(bool is_copy_in)
{ {
CdbCopy *c; CdbCopy *c;
int seg;
c = palloc0(sizeof(CdbCopy)); c = palloc0(sizeof(CdbCopy));
...@@ -68,22 +67,12 @@ makeCdbCopy(bool is_copy_in) ...@@ -68,22 +67,12 @@ makeCdbCopy(bool is_copy_in)
c->hasReplicatedTable = false; c->hasReplicatedTable = false;
c->dispatcherState = NULL; c->dispatcherState = NULL;
initStringInfo(&(c->err_msg)); initStringInfo(&(c->err_msg));
initStringInfo(&(c->err_context));
initStringInfo(&(c->copy_out_buf)); initStringInfo(&(c->copy_out_buf));
/* init total_segs */ /* init total_segs */
c->total_segs = getgpsegmentCount(); c->total_segs = getgpsegmentCount();
c->aotupcounts = NULL; c->aotupcounts = NULL;
/* Initialize the state of each segment database */
c->segdb_state = (SegDbState **) palloc((c->total_segs) * sizeof(SegDbState *));
for (seg = 0; seg < c->total_segs; seg++)
{
c->segdb_state[seg] = (SegDbState *) palloc(2 * sizeof(SegDbState));
c->segdb_state[seg][0] = SEGDB_IDLE; /* Primary can't be OUT */
}
/* init seg list for copy out */ /* init seg list for copy out */
if (!c->copy_in) if (!c->copy_in)
{ {
...@@ -105,7 +94,7 @@ makeCdbCopy(bool is_copy_in) ...@@ -105,7 +94,7 @@ makeCdbCopy(bool is_copy_in)
void void
cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy) cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy)
{ {
int seg; int flags;
/* clean err message */ /* clean err message */
c->err_msg.len = 0; c->err_msg.len = 0;
...@@ -132,16 +121,13 @@ cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy) ...@@ -132,16 +121,13 @@ cdbCopyStart(CdbCopy *c, CopyStmt *stmt, struct GpPolicy *policy)
GP_POLICY_ALL_NUMSEGMENTS); GP_POLICY_ALL_NUMSEGMENTS);
} }
CdbDispatchCopyStart(c, (Node *)stmt, flags = DF_WITH_SNAPSHOT | DF_CANCEL_ON_ERROR;
(c->copy_in ? DF_NEED_TWO_PHASE | DF_WITH_SNAPSHOT : DF_WITH_SNAPSHOT) | DF_CANCEL_ON_ERROR); if (c->copy_in)
flags |= DF_NEED_TWO_PHASE;
SIMPLE_FAULT_INJECTOR(CdbCopyStartAfterDispatch); CdbDispatchCopyStart(c, (Node *) stmt, flags);
/* fill in CdbCopy structure */ SIMPLE_FAULT_INJECTOR(CdbCopyStartAfterDispatch);
for (seg = 0; seg < c->total_segs; seg++)
{
c->segdb_state[seg][0] = SEGDB_COPY; /* we be jammin! */
}
} }
/* /*
...@@ -392,26 +378,22 @@ processCopyEndResults(CdbCopy *c, ...@@ -392,26 +378,22 @@ processCopyEndResults(CdbCopy *c,
SegmentDatabaseDescriptor **db_descriptors, SegmentDatabaseDescriptor **db_descriptors,
int *results, int *results,
int size, int size,
SegmentDatabaseDescriptor **failedSegDBs,
bool *err_header, bool *err_header,
int *failed_count, int *failed_count,
int *total_rows_rejected, int *total_rows_rejected,
int64 *total_rows_completed) int64 *total_rows_completed)
{ {
SegmentDatabaseDescriptor *q;
int seg; int seg;
PGresult *res;
struct pollfd *pollRead = (struct pollfd *) palloc(sizeof(struct pollfd)); struct pollfd *pollRead = (struct pollfd *) palloc(sizeof(struct pollfd));
int segment_rows_rejected = 0; /* num of rows rejected by this QE */
int segment_rows_completed = 0; /* num of rows completed by this
* QE */
ErrorData *first_error = NULL; ErrorData *first_error = NULL;
for (seg = 0; seg < size; seg++) for (seg = 0; seg < size; seg++)
{ {
SegmentDatabaseDescriptor *q = db_descriptors[seg];
int result = results[seg]; int result = results[seg];
int64 segment_rows_rejected = 0; /* num of rows rejected by this QE */
q = db_descriptors[seg]; int64 segment_rows_completed = 0; /* num of rows completed by this QE */
PGresult *res;
/* get command end status */ /* get command end status */
if (result == 0) if (result == 0)
...@@ -554,27 +536,17 @@ processCopyEndResults(CdbCopy *c, ...@@ -554,27 +536,17 @@ processCopyEndResults(CdbCopy *c,
} }
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
/* Finished with this segment db. */
c->segdb_state[seg][0] = SEGDB_DONE;
/* /*
* add number of rows rejected from this segment to the total of * Finished with this segment db.
* rejected rows. Only count from primary segs. *
* Add the number of rows completed and rejected from this segment
* to the totals. Only count from primary segs.
*/ */
if (segment_rows_rejected > 0) if (segment_rows_rejected > 0)
*total_rows_rejected += segment_rows_rejected; *total_rows_rejected += segment_rows_rejected;
segment_rows_rejected = 0;
/*
* add number of rows completed from this segment to the total of
* completed rows. Only count from primary segs
*/
if ((NULL != total_rows_completed) && (segment_rows_completed > 0)) if ((NULL != total_rows_completed) && (segment_rows_completed > 0))
*total_rows_completed += segment_rows_completed; *total_rows_completed += segment_rows_completed;
segment_rows_completed = 0;
/* Lost the connection? */ /* Lost the connection? */
if (PQstatus(q->conn) == CONNECTION_BAD) if (PQstatus(q->conn) == CONNECTION_BAD)
{ {
...@@ -593,7 +565,6 @@ processCopyEndResults(CdbCopy *c, ...@@ -593,7 +565,6 @@ processCopyEndResults(CdbCopy *c,
q->conn = NULL; q->conn = NULL;
/* Let FTS deal with it! */ /* Let FTS deal with it! */
failedSegDBs[*failed_count] = q;
(*failed_count)++; (*failed_count)++;
} }
} }
...@@ -619,7 +590,6 @@ int ...@@ -619,7 +590,6 @@ int
cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort_msg) cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort_msg)
{ {
SegmentDatabaseDescriptor *q; SegmentDatabaseDescriptor *q;
SegmentDatabaseDescriptor **failedSegDBs;
Gang *gp; Gang *gp;
int *results; /* final result of COPY command execution */ int *results; /* final result of COPY command execution */
int seg; int seg;
...@@ -649,9 +619,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort ...@@ -649,9 +619,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort
c->err_msg.data[0] = '\0'; c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0; c->err_msg.cursor = 0;
/* allocate a failed segment database pointer array */
failedSegDBs = (SegmentDatabaseDescriptor **) palloc(c->total_segs * 2 * sizeof(SegmentDatabaseDescriptor *));
db_descriptors = gp->db_descriptors; db_descriptors = gp->db_descriptors;
size = gp->size; size = gp->size;
...@@ -670,7 +637,7 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort ...@@ -670,7 +637,7 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort
*total_rows_completed = 0; *total_rows_completed = 0;
edata = processCopyEndResults(c, db_descriptors, results, size, edata = processCopyEndResults(c, db_descriptors, results, size,
failedSegDBs, &err_header, &err_header,
&failed_count, &total_rows_rejected, &failed_count, &total_rows_rejected,
total_rows_completed); total_rows_completed);
...@@ -689,7 +656,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort ...@@ -689,7 +656,6 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed, char *abort
} }
pfree(results); pfree(results);
pfree(failedSegDBs);
/* If we are aborting the COPY, ignore errors sent by the server. */ /* If we are aborting the COPY, ignore errors sent by the server. */
if (edata && !abort_msg) if (edata && !abort_msg)
......
...@@ -25,18 +25,6 @@ ...@@ -25,18 +25,6 @@
struct CdbDispatcherState; struct CdbDispatcherState;
typedef enum SegDbState
{
/*
* (it is best to avoid names like OUT that are likely to be #define'd or
* typedef'd in some platform-dependent runtime library header file)
*/
SEGDB_OUT, /* Not participating in COPY (invalid etc...) */
SEGDB_IDLE, /* Participating but COPY not yet started */
SEGDB_COPY, /* COPY in progress */
SEGDB_DONE /* COPY completed (with or without errors) */
} SegDbState;
typedef struct CdbCopy typedef struct CdbCopy
{ {
int total_segs; /* total number of segments in cdb */ int total_segs; /* total number of segments in cdb */
...@@ -46,10 +34,7 @@ typedef struct CdbCopy ...@@ -46,10 +34,7 @@ typedef struct CdbCopy
* communicate with segDB's */ * communicate with segDB's */
bool skip_ext_partition;/* skip external partition */ bool skip_ext_partition;/* skip external partition */
SegDbState **segdb_state;
StringInfoData err_msg; /* error message for cdbcopy operations */ StringInfoData err_msg; /* error message for cdbcopy operations */
StringInfoData err_context; /* error context from QE error */
StringInfoData copy_out_buf;/* holds a chunk of data from the database */ StringInfoData copy_out_buf;/* holds a chunk of data from the database */
List *outseglist; /* segs that currently take part in copy out. List *outseglist; /* segs that currently take part in copy out.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册