提交 0f7cb679 编写于 作者: H Heikki Linnakangas

Move function to more logical place.

SendDummyPacket() is completely specific to the UDP interconnect
implementation.

Along the way, I couldn't resist some cosmetic cleanup: use %m rather than
strerror(errno), avoid unnecessary variable initializations, and pgindent.
上级 f6006651
......@@ -793,114 +793,6 @@ void adjustMasterRouting(Slice *recvSlice)
}
}
void
SendDummyPacket(void)
{
int sockfd = -1;
int ret = -1;
struct addrinfo* addrs = NULL;
struct addrinfo* rp = NULL;
struct addrinfo hint;
uint16 udp_listener;
char port_str[32] = {0};
char* dummy_pkt = "stop it";
/*
* Get address info from interconnect udp listener port
*/
udp_listener = Gp_listener_port;
snprintf(port_str, sizeof(port_str), "%d", udp_listener);
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs);
if (ret || !addrs)
{
elog(LOG, "Send dummy packet failed, pg_getaddrinfo_all(): %s", strerror(errno));
goto send_error;
}
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/* Create socket according to pg_getaddrinfo_all() */
sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sockfd < 0)
{
continue;
}
if (!pg_set_noblock(sockfd))
{
if (sockfd >= 0)
{
closesocket(sockfd);
sockfd = -1;
}
continue;
}
break;
}
if (rp == NULL)
{
elog(LOG, "Send dummy packet failed, create socket failed: %s", strerror(errno));
goto send_error;
}
/*
* Send a dummy package to the interconnect listener, try 10 times
*/
int counter = 0;
while (counter < 10)
{
counter++;
ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen);
if (ret < 0)
{
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
{
continue;
}
else
{
elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
goto send_error;
}
}
break;
}
if (counter >= 10)
{
elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
goto send_error;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
closesocket(sockfd);
return;
send_error:
if (addrs)
{
pg_freeaddrinfo_all(hint.ai_family, addrs);
}
if (sockfd != -1)
{
closesocket(sockfd);
}
return;
}
/*
* WaitInterconnectQuit
*
......
......@@ -643,6 +643,7 @@ static void setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int r
static void checkRxThreadError(void);
static void setRxThreadError(int eno);
static void resetRxThreadError(void);
static void SendDummyPacket(void);
static void getSockAddr(struct sockaddr_storage * peer, socklen_t * peer_len, const char * listenerAddr, int listenerPort);
static void setXmitSocketOptions(int txfd);
......@@ -6686,3 +6687,109 @@ WaitInterconnectQuitUDPIFC(void)
}
ic_control_info.threadCreated = false;
}
/*
* Send a dummy packet to interconnect thread to exit poll() immediately
*/
static void
SendDummyPacket(void)
{
int sockfd = -1;
int ret;
struct addrinfo *addrs = NULL;
struct addrinfo *rp;
struct addrinfo hint;
uint16 udp_listener;
char port_str[32] = {0};
char *dummy_pkt = "stop it";
int counter;
/*
* Get address info from interconnect udp listener port
*/
udp_listener = Gp_listener_port;
snprintf(port_str, sizeof(port_str), "%d", udp_listener);
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
/* Never do name resolution */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
#else
hint.ai_flags = AI_NUMERICHOST;
#endif
ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs);
if (ret || !addrs)
{
elog(LOG, "send dummy packet failed, pg_getaddrinfo_all(): %m");
goto send_error;
}
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/* Create socket according to pg_getaddrinfo_all() */
sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sockfd < 0)
continue;
if (!pg_set_noblock(sockfd))
{
if (sockfd >= 0)
{
closesocket(sockfd);
sockfd = -1;
}
continue;
}
break;
}
if (rp == NULL)
{
elog(LOG, "send dummy packet failed, create socket failed: %m");
goto send_error;
}
/*
* Send a dummy package to the interconnect listener, try 10 times
*/
counter = 0;
while (counter < 10)
{
counter++;
ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen);
if (ret < 0)
{
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
else
{
elog(LOG, "send dummy packet failed, sendto failed: %m");
goto send_error;
}
}
break;
}
if (counter >= 10)
{
elog(LOG, "send dummy packet failed, sendto failed: %m");
goto send_error;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
closesocket(sockfd);
return;
send_error:
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
if (sockfd != -1)
closesocket(sockfd);
return;
}
......@@ -50,11 +50,6 @@ extern void CleanUpMotionLayerIPC(void);
*/
extern void WaitInterconnectQuit(void);
/*
* Send a dummy packet to interconnect thread to exit poll() immediately
*/
extern void SendDummyPacket(void);
/* Returns the fd of the socket that connects to the seqserver. This value
* is -1 if it has not been setup.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册