提交 3e1cc863 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Fix a race condition in flushBuffer

flushBuffer() is used to send packets through TCP interconnect, before
sending, it first check whether receiver stopped or teared down the
interconnect, however, there is window between checking and sending, the
receiver may tear down the interconnect and close the peer, so send()
will report an error, to resolve this, we recheck whether the receiver
stopped or teared down the interconnect in this window and don't error
out in that case.
Reviewed-by: NJinbao Chen <jinchen@pivotal.io>
Reviewed-by: NHao Wu <hawu@pivotal.io>
上级 7c90c04f
......@@ -2701,7 +2701,7 @@ flushBuffer(ChunkTransportState *transportStates,
{
struct timeval timeout;
/* check for stop message before sending anything */
/* check for stop message or peer teardown before sending anything */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
MPP_FD_ZERO(&rset);
......@@ -2725,6 +2725,7 @@ flushBuffer(ChunkTransportState *transportStates,
if ((n = send(conn->sockfd, sendptr + sent, conn->msgSize - sent, 0)) < 0)
{
int send_errno = errno;
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
continue;
......@@ -2768,8 +2769,8 @@ flushBuffer(ChunkTransportState *transportStates,
/*
* as a sender... if there is something to read... it must
* mean its a StopSendingMessage. we don't even bother to
* read it.
* mean its a StopSendingMessage or receiver has teared down
* the interconnect, we don't even bother to read it.
*/
if (MPP_FD_ISSET(conn->sockfd, &rset) || transportStates->teardownActive)
{
......@@ -2795,11 +2796,35 @@ flushBuffer(ChunkTransportState *transportStates,
conn->stillActive = false;
return false;
}
/* check whether receiver has teared down the interconnect */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *) &rset, NULL, NULL, &timeout);
/*
* as a sender... if there is something to read... it must
* mean its a StopSendingMessage or receiver has teared down
* the interconnect, we don't even bother to read it.
*/
if (n > 0 && MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
/* got a stop message */
conn->stillActive = false;
return false;
}
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error writing an outgoing packet"),
errdetail("Error during send() call (error:%d) for remote connection: contentId=%d at %s",
errno, conn->remoteContentId,
send_errno, conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册