提交 041d9399 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Remove forceEos mechanism for TCP interconnect

In TCP interconnect, the sender used to force an EOS messages to the
receiver in two cases:
1. cancelUnfinished is true in mppExecutorFinishup.
2. an error occurs.

For case1, the comment says: to finish a cursor, the QD used to send
a cancel to the QEs, QEs then set the cancelUnfinished flag and did
a normal executor finish up. We now use QueryFinishPending mechanism
to stop a cursor, so case1 logic is invalid for a long time.

For case2, the purpose is: when an error occurs, we force an EOS to
the receiver so the receiver didn't report an interconnect error and
QD then will check the dispatch results and report the errors in the
QEs. From the view of interconnect, we have selectedd to the end of
the query and no error in the interconnect, this logic has two
problems:
1. it doesn't work for initplan, initplan will not check the dispatch
results and throw the errors, so when an error occurs in the QEs for
the initplan, the QD cannot notice that.
2. it doesn't work for cursors, for example:
   DECLARE c1 cursor for select i from t1 where i / 0 = 1;
   FETCH all from c1;
   FETCH all from c1;
All FETCH commands don't report errors which is not expected.

This commit removed the forceEos mechanism, for the case2, the
receiver will report an interconnect error without forceEos, this is
ok because when multiple errors reports from QEs, the QD is inclined
to report non-interconnect error.
上级 874939f4
......@@ -543,43 +543,23 @@ SetupInterconnect(EState *estate)
h->interconnect_context = estate->interconnect_context;
}
/*
* Move this out to separate stack frame, so that we don't have to mark
* tons of stuff volatile in TeardownInterconnect().
*/
void
forceEosToPeers(ChunkTransportState *transportStates,
int motNodeID)
{
if (!transportStates)
{
elog(FATAL, "no transport-states.");
}
transportStates->teardownActive = true;
transportStates->SendEos(transportStates, motNodeID, get_eos_tuplechunklist());
transportStates->teardownActive = false;
}
/* TeardownInterconnect() function is used to cleanup interconnect resources that
* were allocated during SetupInterconnect(). This function should ALWAYS be
* called after SetupInterconnect to avoid leaking resources (like sockets)
* even if SetupInterconnect did not complete correctly.
*/
void
TeardownInterconnect(ChunkTransportState *transportStates, bool forceEOS)
TeardownInterconnect(ChunkTransportState *transportStates, bool hasErrors)
{
interconnect_handle_t *h = find_interconnect_handle(transportStates);
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
{
TeardownUDPIFCInterconnect(transportStates, forceEOS);
TeardownUDPIFCInterconnect(transportStates, hasErrors);
}
else if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
{
TeardownTCPInterconnect(transportStates, forceEOS);
TeardownTCPInterconnect(transportStates, hasErrors);
}
if (h != NULL)
......@@ -825,7 +805,7 @@ cleanup_interconnect_handle(interconnect_handle_t *h)
destroy_interconnect_handle(h);
return;
}
TeardownInterconnect(h->interconnect_context, true /* force EOS */);
TeardownInterconnect(h->interconnect_context, true);
}
static void
......
......@@ -1794,14 +1794,9 @@ SetupTCPInterconnect(EState *estate)
* This context is destroyed at the end of the query and all memory that gets
* allocated under it is free'd. We don't have have to worry about pfree() but
* we definitely have to worry about socket resources.
*
* If forceEOS is set, we force end-of-stream notifications out any send-nodes,
* and we wrap that send in a PG_TRY/CATCH block because the goal is to reduce
* confusion (and if we're being shutdown abnormally anyhow, let's not start
* adding errors!).
*/
void
TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
TeardownTCPInterconnect(ChunkTransportState *transportStates, bool hasErrors)
{
ListCell *cell;
ChunkTransportStateEntry *pEntry = NULL;
......@@ -1819,7 +1814,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
* if we're already trying to clean up after an error -- don't allow
* signals to interrupt us
*/
if (forceEOS)
if (hasErrors)
HOLD_INTERRUPTS();
mySlice = &transportStates->sliceTable->slices[transportStates->sliceId];
......@@ -1829,7 +1824,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
{
int elevel = 0;
if (forceEOS || !transportStates->activated)
if (hasErrors || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
......@@ -1843,7 +1838,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GpIdentity.segindex, mySlice->sliceIndex,
forceEOS ? "force" : "normal",
hasErrors ? "error" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
......@@ -1919,9 +1914,6 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
if (forceEOS)
forceEosToPeers(transportStates, mySlice->sliceIndex);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
......@@ -2007,7 +1999,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
* If some errors are happening, senders can skip this step to avoid hung
* issues, QD will take care of the error handling.
*/
if (!forceEOS)
if (!hasErrors)
waitOnOutbound(pEntry);
for (i = 0; i < pEntry->numConns; i++)
......@@ -2038,7 +2030,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS)
pfree(transportStates->states);
pfree(transportStates);
if (forceEOS)
if (hasErrors)
RESUME_INTERRUPTS();
#ifdef AMS_VERBOSE_LOGGING
......
......@@ -703,7 +703,7 @@ static inline TupleChunkListItem RecvTupleChunkFromUDPIFC_Internal(ChunkTranspor
int16 motNodeID,
int16 srcRoute);
static void TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
bool forceEOS);
bool hasErrors);
static void freeDisorderedPackets(MotionConn *conn);
......@@ -3364,7 +3364,7 @@ computeNetworkStatistics(uint64 value, uint64 *min, uint64 *max, double *sum)
*/
static void
TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
bool forceEOS)
bool hasErrors)
{
ChunkTransportStateEntry *pEntry = NULL;
int i;
......@@ -3402,7 +3402,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
{
int elevel = 0;
if (forceEOS || !transportStates->activated)
if (hasErrors || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
......@@ -3416,7 +3416,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GpIdentity.segindex, mySlice->sliceIndex,
forceEOS ? "force" : "normal",
hasErrors ? "hasErrors" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
......@@ -3613,7 +3613,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
"isSender %d isReceiver %d "
"snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d "
"UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %lld DEFAULT_RTT %d "
"forceEOS %d, gp_interconnect_id %d ic_id_last_teardown %d "
"hasErrors %d, gp_interconnect_id %d ic_id_last_teardown %d "
"snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d "
"snd_pkt_count %d retransmits %d crc_errors %d"
" recv_pkt_count %d recv_ack_num %d"
......@@ -3626,7 +3626,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
ic_control_info.isSender, isReceiver,
Gp_interconnect_snd_queue_depth, Gp_interconnect_queue_depth, Gp_max_packet_size,
UNACK_QUEUE_RING_SLOTS_NUM, TIMER_SPAN, DEFAULT_RTT,
forceEOS, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId,
hasErrors, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId,
snd_buffer_pool.count, snd_buffer_pool.maxCount, ic_control_info.socketSendBufferSize, ic_control_info.socketRecvBufferSize,
ic_statistics.sndPktNum, ic_statistics.retransmits, ic_statistics.crcErrors,
ic_statistics.recvPktNum, ic_statistics.recvAckNum,
......@@ -3672,11 +3672,11 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
*/
void
TeardownUDPIFCInterconnect(ChunkTransportState *transportStates,
bool forceEOS)
bool hasErrors)
{
PG_TRY();
{
TeardownUDPIFCInterconnect_Internal(transportStates, forceEOS);
TeardownUDPIFCInterconnect_Internal(transportStates, hasErrors);
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
......
......@@ -1595,14 +1595,7 @@ void mppExecutorFinishup(QueryDesc *queryDesc)
/* Teardown the Interconnect */
if (estate->es_interconnect_is_setup)
{
/*
* MPP-3413: If we got here during cancellation of a cursor,
* we need to set the "forceEos" argument correctly --
* otherwise we potentially hang (cursors cancel on the QEs,
* mark the estate to "cancelUnfinished" and then try to do a
* normal interconnect teardown).
*/
TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished);
TeardownInterconnect(estate->interconnect_context, false);
estate->interconnect_context = NULL;
estate->es_interconnect_is_setup = false;
}
......@@ -1690,7 +1683,7 @@ void mppExecutorCleanup(QueryDesc *queryDesc)
/* Clean up the interconnect. */
if (estate->es_interconnect_is_setup)
{
TeardownInterconnect(estate->interconnect_context, true /* force EOS */);
TeardownInterconnect(estate->interconnect_context, true);
estate->es_interconnect_is_setup = false;
}
......
......@@ -1293,9 +1293,18 @@ PG_TRY();
queryDesc->estate->dispatcherState &&
queryDesc->estate->dispatcherState->primaryResults)
{
ErrorData *qeError = NULL;
CdbDispatcherState *ds = queryDesc->estate->dispatcherState;
cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);
cdbdisp_getDispatchResults(ds, &qeError);
if (qeError)
{
queryDesc->estate->dispatcherState = NULL;
FlushErrorState();
ReThrowError(qeError);
}
/* If EXPLAIN ANALYZE, collect execution stats from qExecs. */
if (planstate->instrument && planstate->instrument->need_cdb)
......
......@@ -117,7 +117,7 @@ extern void SetupInterconnect(struct EState *estate);
*
*/
extern void TeardownInterconnect(ChunkTransportState *transportStates,
bool forceEOS);
bool hasErrors);
extern void WaitInterconnectQuit(void);
......@@ -304,9 +304,6 @@ extern ChunkTransportStateEntry *createChunkTransportState(ChunkTransportState *
extern ChunkTransportStateEntry *removeChunkTransportState(ChunkTransportState *transportStates,
int16 motNodeID);
extern void forceEosToPeers(ChunkTransportState *transportStates,
int motNodeID);
extern TupleChunkListItem RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates);
extern void InitMotionTCP(int *listenerSocketFd, uint16 *listenerPort);
......@@ -318,9 +315,9 @@ extern void WaitInterconnectQuitUDPIFC(void);
extern void SetupTCPInterconnect(EState *estate);
extern void SetupUDPIFCInterconnect(EState *estate);
extern void TeardownTCPInterconnect(ChunkTransportState *transportStates,
bool forceEOS);
bool hasErrors);
extern void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates,
bool forceEOS);
bool hasErrors);
extern uint32 getActiveMotionConns(void);
......
......@@ -557,3 +557,33 @@ SELECT * FROM y LIMIT 10;
(1 row)
DROP TABLE recursive_table_ic;
-- Test QD can notice the errors in QEs for initplan
CREATE TABLE qe_errors_ic (a INT, b INT);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO qe_errors_ic SELECT i, i FROM generate_series(1, 10) i;
SELECT count(*) FROM qe_errors_ic
GROUP BY a, b
HAVING sum(a) > (SELECT max(a) FROM qe_errors_ic WHERE a/0 > 1);
ERROR: division by zero (seg2 slice3 127.0.0.1:7004 pid=20627)
-- Test QD can notice the errors in QEs for cursors
-- In past, bellow DECLARE and FETCH commands had chances to report
-- no errors, it was not expected, we expect either DECLARE or FETCH
-- to report 'division by zero' errors.
--
-- In TCP interconnect mode, DECLARE or FETCH all have chance to
-- report 'division by zero' errors, it depends on the speed of QD
-- and QEs to set up interconnect, so ignore the output of DECLARE
-- and FETCH, we verify the test case by checking the fact that the
-- following commands in the transaction will failed.
BEGIN;
--start_ignore
DECLARE qe_errors_cursor CURSOR FOR SELECT * FROM qe_errors_ic WHERE qe_errors_ic.b / 0 >1;
ERROR: division by zero (seg0 slice1 127.0.0.1:7002 pid=20667)
FETCH ALL FROM qe_errors_cursor;
ERROR: current transaction is aborted, commands ignored until end of transaction block
--end_ignore
select 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
DROP TABLE qe_errors_ic;
......@@ -245,3 +245,30 @@ y(i) AS (
)
SELECT * FROM y LIMIT 10;
DROP TABLE recursive_table_ic;
-- Test QD can notice the errors in QEs for initplan
CREATE TABLE qe_errors_ic (a INT, b INT);
INSERT INTO qe_errors_ic SELECT i, i FROM generate_series(1, 10) i;
SELECT count(*) FROM qe_errors_ic
GROUP BY a, b
HAVING sum(a) > (SELECT max(a) FROM qe_errors_ic WHERE a/0 > 1);
-- Test QD can notice the errors in QEs for cursors
-- In past, bellow DECLARE and FETCH commands had chances to report
-- no errors, it was not expected, we expect either DECLARE or FETCH
-- to report 'division by zero' errors.
--
-- In TCP interconnect mode, DECLARE or FETCH all have chance to
-- report 'division by zero' errors, it depends on the speed of QD
-- and QEs to set up interconnect, so ignore the output of DECLARE
-- and FETCH, we verify the test case by checking the fact that the
-- following commands in the transaction will failed.
BEGIN;
--start_ignore
DECLARE qe_errors_cursor CURSOR FOR SELECT * FROM qe_errors_ic WHERE qe_errors_ic.b / 0 >1;
FETCH ALL FROM qe_errors_cursor;
--end_ignore
select 1;
ROLLBACK;
DROP TABLE qe_errors_ic;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册