提交 8d40268b 编写于 作者: A Adam Lee 提交者: Adam Lee

COPY: expand the type of numcompleted to 64 bits

Integer overflow occurs without this when copied more than 2^31 rows,
under the `COPY ON SEGMENT` mode.

Errors happen when it is casted to uint64, the type of `processed` in
`CopyStateData`, third-party Postgres driver, which takes it as an
int64, fails out of range.
上级 a1fcd70e
...@@ -449,7 +449,7 @@ processCopyEndResults(CdbCopy *c, ...@@ -449,7 +449,7 @@ processCopyEndResults(CdbCopy *c,
bool *first_error, bool *first_error,
int *failed_count, int *failed_count,
int *total_rows_rejected, int *total_rows_rejected,
int *total_rows_completed) int64 *total_rows_completed)
{ {
SegmentDatabaseDescriptor *q; SegmentDatabaseDescriptor *q;
int seg; int seg;
...@@ -702,7 +702,7 @@ cdbCopyEnd(CdbCopy *c) ...@@ -702,7 +702,7 @@ cdbCopyEnd(CdbCopy *c)
* and fetch the total number of rows completed by all QEs * and fetch the total number of rows completed by all QEs
*/ */
int int
cdbCopyEndAndFetchRejectNum(CdbCopy *c, int *total_rows_completed) cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed)
{ {
SegmentDatabaseDescriptor *q; SegmentDatabaseDescriptor *q;
SegmentDatabaseDescriptor **failedSegDBs; SegmentDatabaseDescriptor **failedSegDBs;
......
...@@ -293,7 +293,7 @@ ReportSrehResults(CdbSreh *cdbsreh, int total_rejected) ...@@ -293,7 +293,7 @@ ReportSrehResults(CdbSreh *cdbsreh, int total_rejected)
} }
static void static void
sendnumrows_internal(int numrejected, int numcompleted) sendnumrows_internal(int numrejected, int64 numcompleted)
{ {
StringInfoData buf; StringInfoData buf;
...@@ -304,7 +304,7 @@ sendnumrows_internal(int numrejected, int numcompleted) ...@@ -304,7 +304,7 @@ sendnumrows_internal(int numrejected, int numcompleted)
pq_sendint(&buf, numrejected, 4); pq_sendint(&buf, numrejected, 4);
if (numcompleted > 0) /* optional send completed num for COPY FROM if (numcompleted > 0) /* optional send completed num for COPY FROM
* ON SEGMENT */ * ON SEGMENT */
pq_sendint(&buf, numcompleted, 4); pq_sendint64(&buf, numcompleted);
pq_endmessage(&buf); pq_endmessage(&buf);
} }
...@@ -327,7 +327,7 @@ SendNumRowsRejected(int numrejected) ...@@ -327,7 +327,7 @@ SendNumRowsRejected(int numrejected)
* of rows that were rejected and completed in this last data load * of rows that were rejected and completed in this last data load
*/ */
void void
SendNumRows(int numrejected, int numcompleted) SendNumRows(int numrejected, int64 numcompleted)
{ {
sendnumrows_internal(numrejected, numcompleted); sendnumrows_internal(numrejected, numcompleted);
} }
...@@ -471,7 +471,6 @@ GetNextSegid(CdbSreh *cdbsreh) ...@@ -471,7 +471,6 @@ GetNextSegid(CdbSreh *cdbsreh)
return (cdbsreh->lastsegid++ % total_segs); return (cdbsreh->lastsegid++ % total_segs);
} }
/* /*
* This function is called when we are preparing to insert a bad row that * This function is called when we are preparing to insert a bad row that
* includes an encoding error into the bytea field of the error log file * includes an encoding error into the bytea field of the error log file
......
...@@ -3254,7 +3254,7 @@ CopyFromDispatch(CopyState cstate) ...@@ -3254,7 +3254,7 @@ CopyFromDispatch(CopyState cstate)
bool *nulls; bool *nulls;
int *attr_offsets; int *attr_offsets;
int total_rejected_from_qes = 0; int total_rejected_from_qes = 0;
int total_completed_from_qes = 0; int64 total_completed_from_qes = 0;
bool isnull; bool isnull;
bool *isvarlena; bool *isvarlena;
ResultRelInfo *resultRelInfo; ResultRelInfo *resultRelInfo;
......
...@@ -70,6 +70,6 @@ void cdbCopySendDataToAll(CdbCopy *c, const char *buffer, int nbytes); ...@@ -70,6 +70,6 @@ void cdbCopySendDataToAll(CdbCopy *c, const char *buffer, int nbytes);
void cdbCopySendData(CdbCopy *c, int target_seg, const char *buffer, int nbytes); void cdbCopySendData(CdbCopy *c, int target_seg, const char *buffer, int nbytes);
bool cdbCopyGetData(CdbCopy *c, bool cancel, uint64 *rows_processed); bool cdbCopyGetData(CdbCopy *c, bool cancel, uint64 *rows_processed);
int cdbCopyEnd(CdbCopy *c); int cdbCopyEnd(CdbCopy *c);
int cdbCopyEndAndFetchRejectNum(CdbCopy *c, int *total_rows_completed); int cdbCopyEndAndFetchRejectNum(CdbCopy *c, int64 *total_rows_completed);
#endif /* CDBCOPY_H */ #endif /* CDBCOPY_H */
...@@ -93,7 +93,7 @@ extern CdbSreh *makeCdbSreh(int rejectlimit, bool is_limit_in_rows, ...@@ -93,7 +93,7 @@ extern CdbSreh *makeCdbSreh(int rejectlimit, bool is_limit_in_rows,
extern void destroyCdbSreh(CdbSreh *cdbsreh); extern void destroyCdbSreh(CdbSreh *cdbsreh);
extern void HandleSingleRowError(CdbSreh *cdbsreh); extern void HandleSingleRowError(CdbSreh *cdbsreh);
extern void ReportSrehResults(CdbSreh *cdbsreh, int total_rejected); extern void ReportSrehResults(CdbSreh *cdbsreh, int total_rejected);
extern void SendNumRows(int numrejected, int numcompleted); extern void SendNumRows(int numrejected, int64 numcompleted);
extern void SendNumRowsRejected(int numrejected); extern void SendNumRowsRejected(int numrejected);
extern bool IsErrorTable(Relation rel); extern bool IsErrorTable(Relation rel);
extern void ErrorIfRejectLimitReached(CdbSreh *cdbsreh, CdbCopy *cdbCopy); extern void ErrorIfRejectLimitReached(CdbSreh *cdbsreh, CdbCopy *cdbCopy);
......
...@@ -85,7 +85,7 @@ pqParseInput3(PGconn *conn) ...@@ -85,7 +85,7 @@ pqParseInput3(PGconn *conn)
int avail; int avail;
#ifndef FRONTEND #ifndef FRONTEND
int numRejected = 0; int numRejected = 0;
int numCompleted = 0; int64 numCompleted = 0;
#endif #endif
...@@ -458,7 +458,7 @@ pqParseInput3(PGconn *conn) ...@@ -458,7 +458,7 @@ pqParseInput3(PGconn *conn)
conn->result->numRejected += numRejected; conn->result->numRejected += numRejected;
/* Optionally receive completed number when COPY FROM ON SEGMENT */ /* Optionally receive completed number when COPY FROM ON SEGMENT */
if (msgLength >= 8 && !pqGetInt(&numCompleted, 4, conn)) if (msgLength >= 8 && !pqGetInt64(&numCompleted, conn))
{ {
conn->result->numCompleted += numCompleted; conn->result->numCompleted += numCompleted;
} }
......
...@@ -243,7 +243,7 @@ struct pg_result ...@@ -243,7 +243,7 @@ struct pg_result
/* GPDB: number of rows rejected in SREH (protocol message 'j') */ /* GPDB: number of rows rejected in SREH (protocol message 'j') */
int numRejected; int numRejected;
/* GPDB: number of rows completed when COPY FROM ON SEGMENT */ /* GPDB: number of rows completed when COPY FROM ON SEGMENT */
int numCompleted; int64 numCompleted;
/* GPDB: number of processed tuples for each AO partition */ /* GPDB: number of processed tuples for each AO partition */
int naotupcounts; int naotupcounts;
PQaoRelTupCount *aotupcounts; PQaoRelTupCount *aotupcounts;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册