提交 a61f62af 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Monitor dispatcher connection when receiving from TCP interconnect

This is mainly to resolve slow response to sequence requests under
TCP interconnect, sequence requests are sent through libpqs from
QEs to QD (we call them dispatcher connections). In the past, under
TCP interconnect, QD checked the events on dispatcher connections
every 2 seconds, obviously it's inefficient.

Under UDPIFC mode, QD also monitors the dispatcher connections when
receving tuples from QEs so QD can process sequence requests in
time, this commit applies the same logic to the TCP interconnect.
Reviewed-by: NHao Wu <gfphoenix78@gmail.com>
Reviewed-by: NNing Yu <nyu@pivotal.io>
上级 8f300b64
...@@ -2498,8 +2498,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, ...@@ -2498,8 +2498,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
i, i,
index; index;
bool skipSelect = false; bool skipSelect = false;
int waitFd = PGINVALID_SOCKET;
#ifdef AMS_VERBOSE_LOGGING #ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny(motNodeId=%d)", motNodeID); elog(DEBUG5, "RecvTupleChunkFromAny(motNodeId=%d)", motNodeID);
...@@ -2512,16 +2511,16 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, ...@@ -2512,16 +2511,16 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
do do
{ {
/* Every 2 seconds */ /* Every 2 seconds */
if (retry++ > 4) if (Gp_role == GP_ROLE_DISPATCH && retry++ > 4)
{ {
retry = 0; retry = 0;
/* check to see if the dispatcher should cancel */ /* check to see if the dispatcher should cancel */
if (Gp_role == GP_ROLE_DISPATCH) checkForCancelFromQD(transportStates);
checkForCancelFromQD(transportStates);
} }
struct timeval timeout = tval; struct timeval timeout = tval;
int nfds = pEntry->highReadSock;
/* make sure we check for these. */ /* make sure we check for these. */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive); ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
...@@ -2550,7 +2549,22 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, ...@@ -2550,7 +2549,22 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
if (skipSelect) if (skipSelect)
break; break;
n = select(pEntry->highReadSock + 1, (fd_set *) &rset, NULL, NULL, &timeout); /*
* Also monitor the events on dispatch fds, eg, errors or sequence
* request from QEs.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
waitFd = cdbdisp_getWaitSocketFd(transportStates->estate->dispatcherState);
if (waitFd != PGINVALID_SOCKET)
{
MPP_FD_SET(waitFd, &rset);
if (waitFd > nfds)
nfds = waitFd;
}
}
n = select(nfds + 1, (fd_set *) &rset, NULL, NULL, &timeout);
if (n < 0) if (n < 0)
{ {
if (errno == EINTR) if (errno == EINTR)
...@@ -2560,6 +2574,13 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, ...@@ -2560,6 +2574,13 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
errmsg("interconnect error receiving an incoming packet"), errmsg("interconnect error receiving an incoming packet"),
errdetail("%s: %m", "select"))); errdetail("%s: %m", "select")));
} }
else if (n > 0 && waitFd != PGINVALID_SOCKET && MPP_FD_ISSET(waitFd, &rset))
{
/* handle events on dispatch connection */
checkForCancelFromQD(transportStates);
n--;
}
#ifdef AMS_VERBOSE_LOGGING #ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny() select() returned %d ready sockets", n); elog(DEBUG5, "RecvTupleChunkFromAny() select() returned %d ready sockets", n);
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册