提交 7c90c04f 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Fix interconnect hung issue

We hit interconnect hung issue many times in many cases, all have
the same pattern: the downstream interconnect motion senders keep
sending the tuples and they are blind to the fact that upstream
nodes have finished and quitted the execution earlier, the QD
then get enough tuples and wait all QEs to quit which cause a
deadlock.

Many nodes may quit execution earlier, eg, LIMIT, HashJoin, Nest
Loop, to resolve the hung issue, they need to stop the interconnect
stream explicitly by calling ExecSquelchNode(), however, we cannot
do that for rescan cases in which data might lose, eg, commit
2c011ce4. For rescan cases, we tried using QueryFinishPending to
stop the senders in commit 02213a73 and let senders check this
flag and quit, that commit has its own problem, firstly, QueryFini
shPending can only set by QD, it doesn't work for INSERT or UPDATE
cases, secondly, that commit only let the senders detect the flag
and quit the loop in a rude way (without sending the EOS to its
receiver), the receiver may still be stuck inreceiving tuples.

This commit revert the QueryFinishPending method firstly.

To resolve the hung issue, we move TeardownInterconnect to the
ahead of cdbdisp_checkDispatchResult so it guarantees to stop
the interconnect stream before waiting and checking the status
of QEs.

For UDPIFC, TeardownInterconnect() remove the ic entries, any
packets for this interconnect context will be treated as 'past'
packets and be acked with STOP flag.

For TCP, TeardownInterconnect() close all connection with its
children, the children will treat any readable data in the
connection as a STOP message include the closure operation.

A test case is not included, both commit 2c011ce4 and 02213a73
contain one.
上级 48ffabce
......@@ -4711,9 +4711,6 @@ handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *p
static void
sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
if (!conn->stillActive)
return;
while (conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
{
ICBuffer *buf = NULL;
......@@ -5413,7 +5410,6 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
bool doCheckExpiration = false;
bool gotStops = false;
Assert(conn->stillActive);
Assert(conn->msgSize > 0);
#ifdef AMS_VERBOSE_LOGGING
......@@ -5460,12 +5456,6 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
{
int timeout = (doCheckExpiration ? 0 : computeTimeout(conn, retry));
if (QueryFinishPending)
{
conn->stillActive = false;
return false;
}
if (pollAcks(transportStates, pEntry->txfd, timeout))
{
if (handleAcks(transportStates, pEntry))
......@@ -5612,12 +5602,6 @@ SendEosUDPIFC(ChunkTransportState *transportStates,
retry = 0;
ic_control_info.lastPacketSendTime = 0;
if (QueryFinishPending)
{
conn->stillActive = false;
continue;
}
/* wait until this queue is emptied */
while (icBufferListLength(&conn->unackQueue) > 0 ||
icBufferListLength(&conn->sndQueue) > 0)
......
......@@ -2124,6 +2124,21 @@ void mppExecutorFinishup(QueryDesc *queryDesc)
currentSlice = getCurrentSlice(estate, LocallyExecutingSliceIndex(estate));
/* Teardown the Interconnect */
if (estate->es_interconnect_is_setup)
{
/*
* MPP-3413: If we got here during cancellation of a cursor,
* we need to set the "forceEos" argument correctly --
* otherwise we potentially hang (cursors cancel on the QEs,
* mark the estate to "cancelUnfinished" and then try to do a
* normal interconnect teardown).
*/
TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished, false);
estate->interconnect_context = NULL;
estate->es_interconnect_is_setup = false;
}
/*
* If QD, wait for QEs to finish and check their results.
*/
......@@ -2242,20 +2257,6 @@ void mppExecutorFinishup(QueryDesc *queryDesc)
estate->dispatcherState = NULL;
cdbdisp_destroyDispatcherState(ds);
}
/* Teardown the Interconnect */
if (estate->es_interconnect_is_setup)
{
/*
* MPP-3413: If we got here during cancellation of a cursor,
* we need to set the "forceEos" argument correctly --
* otherwise we potentially hang (cursors cancel on the QEs,
* mark the estate to "cancelUnfinished" and then try to do a
* normal interconnect teardown).
*/
TeardownInterconnect(estate->interconnect_context, estate->cancelUnfinished, false);
estate->es_interconnect_is_setup = false;
}
}
/*
......@@ -2275,6 +2276,12 @@ void mppExecutorCleanup(QueryDesc *queryDesc)
/* GPDB hook for collecting query info */
if (query_info_collect_hook && QueryCancelCleanup)
(*query_info_collect_hook)(METRICS_QUERY_CANCELING, queryDesc);
/* Clean up the interconnect. */
if (estate->es_interconnect_is_setup)
{
TeardownInterconnect(estate->interconnect_context, true /* force EOS */, true);
estate->es_interconnect_is_setup = false;
}
/*
* If this query is being canceled, record that when the gpperfmon
......@@ -2299,13 +2306,6 @@ void mppExecutorCleanup(QueryDesc *queryDesc)
CdbDispatchHandleError(ds);
}
/* Clean up the interconnect. */
if (estate->es_interconnect_is_setup)
{
TeardownInterconnect(estate->interconnect_context, true /* force EOS */, true);
estate->es_interconnect_is_setup = false;
}
/* GPDB hook for collecting query info */
if (query_info_collect_hook)
(*query_info_collect_hook)(QueryCancelCleanup ? METRICS_QUERY_CANCELED : METRICS_QUERY_ERROR, queryDesc);
......
......@@ -1211,6 +1211,15 @@ PG_TRY();
prm->isnull = false;
}
/* Clean up the interconnect. */
if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup)
{
TeardownInterconnect(queryDesc->estate->interconnect_context,
false, false); /* following success on QD */
queryDesc->estate->interconnect_context = NULL;
queryDesc->estate->es_interconnect_is_setup = false;
}
/*
* If we dispatched to QEs, wait for completion.
*/
......@@ -1221,15 +1230,7 @@ PG_TRY();
{
CdbDispatcherState *ds = queryDesc->estate->dispatcherState;
/*
* We are in a subplan, the eflags always contains EXEC_FLAG_REWIND which
* means we cannot squelch the motion node earlier and some QEs still keep
* sending tuples.
*
* we get all the tuples we needed, DISPATCH_WAIT_FINISH tell QEs stopping
* sending tuples and wait them to complete.
*/
cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_FINISH);
cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);
/* If EXPLAIN ANALYZE, collect execution stats from qExecs. */
if (planstate->instrument && planstate->instrument->need_cdb)
......@@ -1245,15 +1246,6 @@ PG_TRY();
queryDesc->estate->dispatcherState = NULL;
cdbdisp_destroyDispatcherState(ds);
}
/* Clean up the interconnect. */
if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup)
{
TeardownInterconnect(queryDesc->estate->interconnect_context,
false, false); /* following success on QD */
queryDesc->estate->interconnect_context = NULL;
queryDesc->estate->es_interconnect_is_setup = false;
}
}
PG_CATCH();
{
......@@ -1279,6 +1271,18 @@ PG_CATCH();
/* Restore memory high-water mark for root slice of main query. */
MemoryContextSetPeakSpace(planstate->state->es_query_cxt, savepeakspace);
/*
* Clean up the interconnect.
* CDB TODO: Is this needed following failure on QD?
*/
if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup)
{
TeardownInterconnect(queryDesc->estate->interconnect_context,
true, false);
queryDesc->estate->interconnect_context = NULL;
queryDesc->estate->es_interconnect_is_setup = false;
}
/*
* Request any commands still executing on qExecs to stop.
* Wait for them to finish and clean up the dispatching structures.
......@@ -1292,17 +1296,6 @@ PG_CATCH();
CdbDispatchHandleError(ds);
}
/*
* Clean up the interconnect.
* CDB TODO: Is this needed following failure on QD?
*/
if (queryDesc && queryDesc->estate && queryDesc->estate->es_interconnect_is_setup)
{
TeardownInterconnect(queryDesc->estate->interconnect_context,
true, false);
queryDesc->estate->interconnect_context = NULL;
queryDesc->estate->es_interconnect_is_setup = false;
}
PG_RE_THROW();
}
PG_END_TRY();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册