diff --git a/src/backend/cdb/motion/ic_udpifc.c b/src/backend/cdb/motion/ic_udpifc.c index 8cae59d360af02588cf80005ed20f536ad10e048..2ac596b5c3bdf732545f8fb28df5d4587e6ff803 100644 --- a/src/backend/cdb/motion/ic_udpifc.c +++ b/src/backend/cdb/motion/ic_udpifc.c @@ -4711,9 +4711,6 @@ handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *p static void sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn) { - if (!conn->stillActive) - return; - while (conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0) { ICBuffer *buf = NULL; @@ -5413,7 +5410,6 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, bool doCheckExpiration = false; bool gotStops = false; - Assert(conn->stillActive); Assert(conn->msgSize > 0); #ifdef AMS_VERBOSE_LOGGING @@ -5460,12 +5456,6 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, { int timeout = (doCheckExpiration ? 0 : computeTimeout(conn, retry)); - if (QueryFinishPending) - { - conn->stillActive = false; - return false; - } - if (pollAcks(transportStates, pEntry->txfd, timeout)) { if (handleAcks(transportStates, pEntry)) @@ -5612,12 +5602,6 @@ SendEosUDPIFC(ChunkTransportState *transportStates, retry = 0; ic_control_info.lastPacketSendTime = 0; - if (QueryFinishPending) - { - conn->stillActive = false; - continue; - } - /* wait until this queue is emptied */ while (icBufferListLength(&conn->unackQueue) > 0 || icBufferListLength(&conn->sndQueue) > 0) diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index bfd0d5399d76e5ba24bf89bcf576be9ac99436be..6bac06c9ed5ab040ae9ddc60cc512a9b47c5c83b 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -2124,6 +2124,21 @@ void mppExecutorFinishup(QueryDesc *queryDesc) currentSlice = getCurrentSlice(estate, LocallyExecutingSliceIndex(estate)); + /* Teardown the Interconnect */ + if (estate->es_interconnect_is_setup) + { + /* + * MPP-3413: If we got here during cancellation of a cursor, + * we need to set the "forceEos" argument correctly -- + * otherwise we potentially hang (cursors cancel on the QEs, + * mark the estate to "cancelUnfinished" and then try to do a + * normal interconnect teardown). + */ + TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished, false); + estate->interconnect_context = NULL; + estate->es_interconnect_is_setup = false; + } + /* * If QD, wait for QEs to finish and check their results. */ @@ -2242,20 +2257,6 @@ void mppExecutorFinishup(QueryDesc *queryDesc) estate->dispatcherState = NULL; cdbdisp_destroyDispatcherState(ds); } - - /* Teardown the Interconnect */ - if (estate->es_interconnect_is_setup) - { - /* - * MPP-3413: If we got here during cancellation of a cursor, - * we need to set the "forceEos" argument correctly -- - * otherwise we potentially hang (cursors cancel on the QEs, - * mark the estate to "cancelUnfinished" and then try to do a - * normal interconnect teardown). - */ - TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished, false); - estate->es_interconnect_is_setup = false; - } } /* @@ -2275,6 +2276,12 @@ void mppExecutorCleanup(QueryDesc *queryDesc) /* GPDB hook for collecting query info */ if (query_info_collect_hook && QueryCancelCleanup) (*query_info_collect_hook)(METRICS_QUERY_CANCELING, queryDesc); + /* Clean up the interconnect. */ + if (estate->es_interconnect_is_setup) + { + TeardownInterconnect(estate->interconnect_context, true /* force EOS */, true); + estate->es_interconnect_is_setup = false; + } /* * If this query is being canceled, record that when the gpperfmon @@ -2299,13 +2306,6 @@ void mppExecutorCleanup(QueryDesc *queryDesc) CdbDispatchHandleError(ds); } - /* Clean up the interconnect. */ - if (estate->es_interconnect_is_setup) - { - TeardownInterconnect(estate->interconnect_context, true /* force EOS */, true); - estate->es_interconnect_is_setup = false; - } - /* GPDB hook for collecting query info */ if (query_info_collect_hook) (*query_info_collect_hook)(QueryCancelCleanup ? METRICS_QUERY_CANCELED : METRICS_QUERY_ERROR, queryDesc); diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 83adf3427e69ea3c1b737b53f88163d751b2afec..b3e3fcebab04ffd54dfa5cb5283b195576490599 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -1211,6 +1211,15 @@ PG_TRY(); prm->isnull = false; } + /* Clean up the interconnect. */ + if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup) + { + TeardownInterconnect(queryDesc->estate->interconnect_context, + false, false); /* following success on QD */ + queryDesc->estate->interconnect_context = NULL; + queryDesc->estate->es_interconnect_is_setup = false; + } + /* * If we dispatched to QEs, wait for completion. */ @@ -1221,15 +1230,7 @@ PG_TRY(); { CdbDispatcherState *ds = queryDesc->estate->dispatcherState; - /* - * We are in a subplan, the eflags always contains EXEC_FLAG_REWIND which - * means we cannot squelch the motion node earlier and some QEs still keep - * sending tuples. - * - * we get all the tuples we needed, DISPATCH_WAIT_FINISH tell QEs stopping - * sending tuples and wait them to complete. - */ - cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_FINISH); + cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE); /* If EXPLAIN ANALYZE, collect execution stats from qExecs. */ if (planstate->instrument && planstate->instrument->need_cdb) @@ -1245,15 +1246,6 @@ PG_TRY(); queryDesc->estate->dispatcherState = NULL; cdbdisp_destroyDispatcherState(ds); } - - /* Clean up the interconnect. */ - if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup) - { - TeardownInterconnect(queryDesc->estate->interconnect_context, - false, false); /* following success on QD */ - queryDesc->estate->interconnect_context = NULL; - queryDesc->estate->es_interconnect_is_setup = false; - } } PG_CATCH(); { @@ -1279,6 +1271,18 @@ PG_CATCH(); /* Restore memory high-water mark for root slice of main query. */ MemoryContextSetPeakSpace(planstate->state->es_query_cxt, savepeakspace); + /* + * Clean up the interconnect. + * CDB TODO: Is this needed following failure on QD? + */ + if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup) + { + TeardownInterconnect(queryDesc->estate->interconnect_context, + true, false); + queryDesc->estate->interconnect_context = NULL; + queryDesc->estate->es_interconnect_is_setup = false; + } + /* * Request any commands still executing on qExecs to stop. * Wait for them to finish and clean up the dispatching structures. @@ -1292,17 +1296,6 @@ PG_CATCH(); CdbDispatchHandleError(ds); } - /* - * Clean up the interconnect. - * CDB TODO: Is this needed following failure on QD? - */ - if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup) - { - TeardownInterconnect(queryDesc->estate->interconnect_context, - true, false); - queryDesc->estate->interconnect_context = NULL; - queryDesc->estate->es_interconnect_is_setup = false; - } PG_RE_THROW(); } PG_END_TRY();