From 041d9399bba4d55d426156651d2a4e0fc771a3e9 Mon Sep 17 00:00:00 2001 From: Pengzhou Tang Date: Thu, 16 Apr 2020 23:55:21 -0400 Subject: [PATCH] Remove forceEos mechanism for TCP interconnect In TCP interconnect, the sender used to force an EOS messages to the receiver in two cases: 1. cancelUnfinished is true in mppExecutorFinishup. 2. an error occurs. For case1, the comment says: to finish a cursor, the QD used to send a cancel to the QEs, QEs then set the cancelUnfinished flag and did a normal executor finish up. We now use QueryFinishPending mechanism to stop a cursor, so case1 logic is invalid for a long time. For case2, the purpose is: when an error occurs, we force an EOS to the receiver so the receiver didn't report an interconnect error and QD then will check the dispatch results and report the errors in the QEs. From the view of interconnect, we have selectedd to the end of the query and no error in the interconnect, this logic has two problems: 1. it doesn't work for initplan, initplan will not check the dispatch results and throw the errors, so when an error occurs in the QEs for the initplan, the QD cannot notice that. 2. it doesn't work for cursors, for example: DECLARE c1 cursor for select i from t1 where i / 0 = 1; FETCH all from c1; FETCH all from c1; All FETCH commands don't report errors which is not expected. This commit removed the forceEos mechanism, for the case2, the receiver will report an interconnect error without forceEos, this is ok because when multiple errors reports from QEs, the QD is inclined to report non-interconnect error. --- src/backend/cdb/motion/ic_common.c | 28 ++++------------------------ src/backend/cdb/motion/ic_tcp.c | 20 ++++++-------------- src/backend/cdb/motion/ic_udpifc.c | 16 ++++++++-------- src/backend/executor/execUtils.c | 11 ++--------- src/backend/executor/nodeSubplan.c | 9 +++++++++ src/include/cdb/ml_ipc.h | 9 +++------ src/test/regress/expected/ic.out | 30 ++++++++++++++++++++++++++++++ src/test/regress/sql/ic.sql | 27 +++++++++++++++++++++++++++ 8 files changed, 89 insertions(+), 61 deletions(-) diff --git a/src/backend/cdb/motion/ic_common.c b/src/backend/cdb/motion/ic_common.c index d7d41275a2..f78149e9f4 100644 --- a/src/backend/cdb/motion/ic_common.c +++ b/src/backend/cdb/motion/ic_common.c @@ -543,43 +543,23 @@ SetupInterconnect(EState *estate) h->interconnect_context = estate->interconnect_context; } -/* - * Move this out to separate stack frame, so that we don't have to mark - * tons of stuff volatile in TeardownInterconnect(). - */ -void -forceEosToPeers(ChunkTransportState *transportStates, - int motNodeID) -{ - if (!transportStates) - { - elog(FATAL, "no transport-states."); - } - - transportStates->teardownActive = true; - - transportStates->SendEos(transportStates, motNodeID, get_eos_tuplechunklist()); - - transportStates->teardownActive = false; -} - /* TeardownInterconnect() function is used to cleanup interconnect resources that * were allocated during SetupInterconnect(). This function should ALWAYS be * called after SetupInterconnect to avoid leaking resources (like sockets) * even if SetupInterconnect did not complete correctly. */ void -TeardownInterconnect(ChunkTransportState *transportStates, bool forceEOS) +TeardownInterconnect(ChunkTransportState *transportStates, bool hasErrors) { interconnect_handle_t *h = find_interconnect_handle(transportStates); if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) { - TeardownUDPIFCInterconnect(transportStates, forceEOS); + TeardownUDPIFCInterconnect(transportStates, hasErrors); } else if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP) { - TeardownTCPInterconnect(transportStates, forceEOS); + TeardownTCPInterconnect(transportStates, hasErrors); } if (h != NULL) @@ -825,7 +805,7 @@ cleanup_interconnect_handle(interconnect_handle_t *h) destroy_interconnect_handle(h); return; } - TeardownInterconnect(h->interconnect_context, true /* force EOS */); + TeardownInterconnect(h->interconnect_context, true); } static void diff --git a/src/backend/cdb/motion/ic_tcp.c b/src/backend/cdb/motion/ic_tcp.c index 7261a27cad..598da07fe1 100644 --- a/src/backend/cdb/motion/ic_tcp.c +++ b/src/backend/cdb/motion/ic_tcp.c @@ -1794,14 +1794,9 @@ SetupTCPInterconnect(EState *estate) * This context is destroyed at the end of the query and all memory that gets * allocated under it is free'd. We don't have have to worry about pfree() but * we definitely have to worry about socket resources. - * - * If forceEOS is set, we force end-of-stream notifications out any send-nodes, - * and we wrap that send in a PG_TRY/CATCH block because the goal is to reduce - * confusion (and if we're being shutdown abnormally anyhow, let's not start - * adding errors!). */ void -TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) +TeardownTCPInterconnect(ChunkTransportState *transportStates, bool hasErrors) { ListCell *cell; ChunkTransportStateEntry *pEntry = NULL; @@ -1819,7 +1814,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) * if we're already trying to clean up after an error -- don't allow * signals to interrupt us */ - if (forceEOS) + if (hasErrors) HOLD_INTERRUPTS(); mySlice = &transportStates->sliceTable->slices[transportStates->sliceId]; @@ -1829,7 +1824,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) { int elevel = 0; - if (forceEOS || !transportStates->activated) + if (hasErrors || !transportStates->activated) { if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) elevel = LOG; @@ -1843,7 +1838,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: " "%s; setup was %s", GpIdentity.segindex, mySlice->sliceIndex, - forceEOS ? "force" : "normal", + hasErrors ? "error" : "normal", transportStates->activated ? "completed" : "exited"))); /* if setup did not complete, log the slicetable */ @@ -1919,9 +1914,6 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry); - if (forceEOS) - forceEosToPeers(transportStates, mySlice->sliceIndex); - for (i = 0; i < pEntry->numConns; i++) { conn = pEntry->conns + i; @@ -2007,7 +1999,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) * If some errors are happening, senders can skip this step to avoid hung * issues, QD will take care of the error handling. */ - if (!forceEOS) + if (!hasErrors) waitOnOutbound(pEntry); for (i = 0; i < pEntry->numConns; i++) @@ -2038,7 +2030,7 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates, bool forceEOS) pfree(transportStates->states); pfree(transportStates); - if (forceEOS) + if (hasErrors) RESUME_INTERRUPTS(); #ifdef AMS_VERBOSE_LOGGING diff --git a/src/backend/cdb/motion/ic_udpifc.c b/src/backend/cdb/motion/ic_udpifc.c index 6565497594..a22740d3d9 100644 --- a/src/backend/cdb/motion/ic_udpifc.c +++ b/src/backend/cdb/motion/ic_udpifc.c @@ -703,7 +703,7 @@ static inline TupleChunkListItem RecvTupleChunkFromUDPIFC_Internal(ChunkTranspor int16 motNodeID, int16 srcRoute); static void TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, - bool forceEOS); + bool hasErrors); static void freeDisorderedPackets(MotionConn *conn); @@ -3364,7 +3364,7 @@ computeNetworkStatistics(uint64 value, uint64 *min, uint64 *max, double *sum) */ static void TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, - bool forceEOS) + bool hasErrors) { ChunkTransportStateEntry *pEntry = NULL; int i; @@ -3402,7 +3402,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, { int elevel = 0; - if (forceEOS || !transportStates->activated) + if (hasErrors || !transportStates->activated) { if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) elevel = LOG; @@ -3416,7 +3416,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: " "%s; setup was %s", GpIdentity.segindex, mySlice->sliceIndex, - forceEOS ? "force" : "normal", + hasErrors ? "hasErrors" : "normal", transportStates->activated ? "completed" : "exited"))); /* if setup did not complete, log the slicetable */ @@ -3613,7 +3613,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, "isSender %d isReceiver %d " "snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d " "UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %lld DEFAULT_RTT %d " - "forceEOS %d, gp_interconnect_id %d ic_id_last_teardown %d " + "hasErrors %d, gp_interconnect_id %d ic_id_last_teardown %d " "snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d " "snd_pkt_count %d retransmits %d crc_errors %d" " recv_pkt_count %d recv_ack_num %d" @@ -3626,7 +3626,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, ic_control_info.isSender, isReceiver, Gp_interconnect_snd_queue_depth, Gp_interconnect_queue_depth, Gp_max_packet_size, UNACK_QUEUE_RING_SLOTS_NUM, TIMER_SPAN, DEFAULT_RTT, - forceEOS, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId, + hasErrors, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId, snd_buffer_pool.count, snd_buffer_pool.maxCount, ic_control_info.socketSendBufferSize, ic_control_info.socketRecvBufferSize, ic_statistics.sndPktNum, ic_statistics.retransmits, ic_statistics.crcErrors, ic_statistics.recvPktNum, ic_statistics.recvAckNum, @@ -3672,11 +3672,11 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, */ void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates, - bool forceEOS) + bool hasErrors) { PG_TRY(); { - TeardownUDPIFCInterconnect_Internal(transportStates, forceEOS); + TeardownUDPIFCInterconnect_Internal(transportStates, hasErrors); Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0); } diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 26980b0501..a2b23a8d36 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -1595,14 +1595,7 @@ void mppExecutorFinishup(QueryDesc *queryDesc) /* Teardown the Interconnect */ if (estate->es_interconnect_is_setup) { - /* - * MPP-3413: If we got here during cancellation of a cursor, - * we need to set the "forceEos" argument correctly -- - * otherwise we potentially hang (cursors cancel on the QEs, - * mark the estate to "cancelUnfinished" and then try to do a - * normal interconnect teardown). - */ - TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished); + TeardownInterconnect(estate->interconnect_context, false); estate->interconnect_context = NULL; estate->es_interconnect_is_setup = false; } @@ -1690,7 +1683,7 @@ void mppExecutorCleanup(QueryDesc *queryDesc) /* Clean up the interconnect. */ if (estate->es_interconnect_is_setup) { - TeardownInterconnect(estate->interconnect_context, true /* force EOS */); + TeardownInterconnect(estate->interconnect_context, true); estate->es_interconnect_is_setup = false; } diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 6866c1a8b8..115611939c 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -1293,9 +1293,18 @@ PG_TRY(); queryDesc->estate->dispatcherState && queryDesc->estate->dispatcherState->primaryResults) { + ErrorData *qeError = NULL; CdbDispatcherState *ds = queryDesc->estate->dispatcherState; cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE); + cdbdisp_getDispatchResults(ds, &qeError); + + if (qeError) + { + queryDesc->estate->dispatcherState = NULL; + FlushErrorState(); + ReThrowError(qeError); + } /* If EXPLAIN ANALYZE, collect execution stats from qExecs. */ if (planstate->instrument && planstate->instrument->need_cdb) diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h index 4e1f808058..1f81064cc3 100644 --- a/src/include/cdb/ml_ipc.h +++ b/src/include/cdb/ml_ipc.h @@ -117,7 +117,7 @@ extern void SetupInterconnect(struct EState *estate); * */ extern void TeardownInterconnect(ChunkTransportState *transportStates, - bool forceEOS); + bool hasErrors); extern void WaitInterconnectQuit(void); @@ -304,9 +304,6 @@ extern ChunkTransportStateEntry *createChunkTransportState(ChunkTransportState * extern ChunkTransportStateEntry *removeChunkTransportState(ChunkTransportState *transportStates, int16 motNodeID); -extern void forceEosToPeers(ChunkTransportState *transportStates, - int motNodeID); - extern TupleChunkListItem RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates); extern void InitMotionTCP(int *listenerSocketFd, uint16 *listenerPort); @@ -318,9 +315,9 @@ extern void WaitInterconnectQuitUDPIFC(void); extern void SetupTCPInterconnect(EState *estate); extern void SetupUDPIFCInterconnect(EState *estate); extern void TeardownTCPInterconnect(ChunkTransportState *transportStates, - bool forceEOS); + bool hasErrors); extern void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates, - bool forceEOS); + bool hasErrors); extern uint32 getActiveMotionConns(void); diff --git a/src/test/regress/expected/ic.out b/src/test/regress/expected/ic.out index 64c0463975..af50233107 100644 --- a/src/test/regress/expected/ic.out +++ b/src/test/regress/expected/ic.out @@ -557,3 +557,33 @@ SELECT * FROM y LIMIT 10; (1 row) DROP TABLE recursive_table_ic; +-- Test QD can notice the errors in QEs for initplan +CREATE TABLE qe_errors_ic (a INT, b INT); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO qe_errors_ic SELECT i, i FROM generate_series(1, 10) i; +SELECT count(*) FROM qe_errors_ic +GROUP BY a, b +HAVING sum(a) > (SELECT max(a) FROM qe_errors_ic WHERE a/0 > 1); +ERROR: division by zero (seg2 slice3 127.0.0.1:7004 pid=20627) +-- Test QD can notice the errors in QEs for cursors +-- In past, bellow DECLARE and FETCH commands had chances to report +-- no errors, it was not expected, we expect either DECLARE or FETCH +-- to report 'division by zero' errors. +-- +-- In TCP interconnect mode, DECLARE or FETCH all have chance to +-- report 'division by zero' errors, it depends on the speed of QD +-- and QEs to set up interconnect, so ignore the output of DECLARE +-- and FETCH, we verify the test case by checking the fact that the +-- following commands in the transaction will failed. +BEGIN; +--start_ignore +DECLARE qe_errors_cursor CURSOR FOR SELECT * FROM qe_errors_ic WHERE qe_errors_ic.b / 0 >1; +ERROR: division by zero (seg0 slice1 127.0.0.1:7002 pid=20667) +FETCH ALL FROM qe_errors_cursor; +ERROR: current transaction is aborted, commands ignored until end of transaction block +--end_ignore +select 1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +DROP TABLE qe_errors_ic; diff --git a/src/test/regress/sql/ic.sql b/src/test/regress/sql/ic.sql index db3cc1ddf3..d5a98e4c5b 100644 --- a/src/test/regress/sql/ic.sql +++ b/src/test/regress/sql/ic.sql @@ -245,3 +245,30 @@ y(i) AS ( ) SELECT * FROM y LIMIT 10; DROP TABLE recursive_table_ic; + +-- Test QD can notice the errors in QEs for initplan +CREATE TABLE qe_errors_ic (a INT, b INT); +INSERT INTO qe_errors_ic SELECT i, i FROM generate_series(1, 10) i; +SELECT count(*) FROM qe_errors_ic +GROUP BY a, b +HAVING sum(a) > (SELECT max(a) FROM qe_errors_ic WHERE a/0 > 1); + +-- Test QD can notice the errors in QEs for cursors +-- In past, bellow DECLARE and FETCH commands had chances to report +-- no errors, it was not expected, we expect either DECLARE or FETCH +-- to report 'division by zero' errors. +-- +-- In TCP interconnect mode, DECLARE or FETCH all have chance to +-- report 'division by zero' errors, it depends on the speed of QD +-- and QEs to set up interconnect, so ignore the output of DECLARE +-- and FETCH, we verify the test case by checking the fact that the +-- following commands in the transaction will failed. +BEGIN; +--start_ignore +DECLARE qe_errors_cursor CURSOR FOR SELECT * FROM qe_errors_ic WHERE qe_errors_ic.b / 0 >1; +FETCH ALL FROM qe_errors_cursor; +--end_ignore +select 1; +ROLLBACK; + +DROP TABLE qe_errors_ic; -- GitLab