提交 1c1f1644 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Fix teardown issue of TCP interconnect

Previously, for an interconnect connection, if no data are available at
sender peer, the sender sends a customized EOS packet to the receiver
and disables further send operations using shutdown(SHUT_WR), then
somehow, the sender closes the connection totally with close()
immediately and it counts on the kernel and TCP stack to guarantee the
data been transformed to the receiver. The problem is, on some platform,
if the connection is closed on one side, the TCP behave is undetermined,
the packets may be lost and receiver may report an unexpected error.

The correct way is sender blocks on the connection until receiver
getting the EOS packet and close its peer, then the sender can close
the connection safely.
上级 2c011ce4
......@@ -2143,7 +2143,21 @@ TeardownTCPInterconnect(ChunkTransportState *transportStates,
/* cleanup a Sending motion node. */
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
if (!forceEOS)
/*
* On a normal teardown routine, sender has sent an EOS packet and
* disabled further send operations on phase 1. sender can't close the
* connection immediately because EOS packet or data packets within the
* kernel sending buffer may be lost on some platform if sender close the
* connection totally.
*
* The correct way is sender blocks on the connection until recievers
* get the EOS packets and close the peer, then it's safe for sender to
* the connection totally.
*
* If some errors are happening, senders can skip this step to avoid hung
* issues, QD will take care of the error handling.
*/
if (!hasError)
waitOnOutbound(pEntry);
for (i = 0; i < pEntry->numConns; i++)
......@@ -2459,7 +2473,7 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)
if (conn_count == 0)
return;
if (InterruptPending)
if (InterruptPending || QueryFinishPending)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "waitOnOutbound(): interrupt pending fast-track");
......@@ -2481,7 +2495,7 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)
{
saved_err = errno;
if (InterruptPending)
if (InterruptPending || QueryFinishPending)
return;
/*
......@@ -2502,19 +2516,20 @@ waitOnOutbound(ChunkTransportStateEntry *pEntry)
/* ready to read. */
count = recv(conn->sockfd, &buf, sizeof(buf), 0);
if (count == 0) /* done ! */
if (count == 0 || count == 1) /* done ! */
{
/* got a stop message */
AssertImply(count == 1, buf == 'S');
MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
continue;
}
if (count > 0 || (count < 0 && errno == EAGAIN))
continue;
/* Some other kind of error happened */
if (errno == EINTR)
else if (count < 0 && (errno == EAGAIN || errno == EINTR))
continue;
/*
* Something unexpected, but probably not horrible warn and
* return
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册