提交 e90d241e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

stop UDP/TCP connection first, then close all connections, then clean up

上级 de8c9fc4
...@@ -21,6 +21,7 @@ extern "C" { ...@@ -21,6 +21,7 @@ extern "C" {
#endif #endif
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle);
......
...@@ -147,12 +147,19 @@ void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, vo ...@@ -147,12 +147,19 @@ void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, vo
}; };
void (*taosCleanUpConn[])(void *thandle) = { void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpUdpConnection, NULL,
taosCleanUpUdpConnection, NULL,
taosCleanUpTcpServer, taosCleanUpTcpServer,
taosCleanUpTcpClient taosCleanUpTcpClient
}; };
void (*taosStopConn[])(void *thandle) = {
taosCleanUpUdpConnection,
taosCleanUpUdpConnection,
taosStopTcpServer,
NULL
};
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData,
taosSendUdpData, taosSendUdpData,
...@@ -289,14 +296,26 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -289,14 +296,26 @@ void *rpcOpen(const SRpcInit *pInit) {
void rpcClose(void *param) { void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param; SRpcInfo *pRpc = (SRpcInfo *)param;
// stop connection to outside first
if (taosStopConn[pRpc->connType | RPC_CONN_TCP])
(*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
if (taosStopConn[pRpc->connType])
(*taosStopConn[pRpc->connType])(pRpc->udphandle);
// close all connections
for (int i = 0; i < pRpc->sessions; ++i) { for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList && pRpc->connList[i].user[0]) { if (pRpc->connList && pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i)); rpcCloseConn((void *)(pRpc->connList + i));
} }
} }
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); // clean up
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); if (taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
if (taosCleanUpConn[pRpc->connType])
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
tTrace("%s rpc is closed", pRpc->label); tTrace("%s rpc is closed", pRpc->label);
rpcDecRef(pRpc); rpcDecRef(pRpc);
...@@ -588,6 +607,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -588,6 +607,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
pConn->inTranId = 0; pConn->inTranId = 0;
pConn->outTranId = 0; pConn->outTranId = 0;
pConn->secured = 0; pConn->secured = 0;
pConn->peerId = 0;
pConn->peerIp = 0; pConn->peerIp = 0;
pConn->peerPort = 0; pConn->peerPort = 0;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
...@@ -627,6 +647,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -627,6 +647,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
tTrace("%s %p client connection is allocated", pRpc->label, pConn);
} }
return pConn; return pConn;
...@@ -681,6 +702,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -681,6 +702,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p server connection is allocated", pRpc->label, pConn);
} }
return pConn; return pConn;
......
...@@ -190,14 +190,19 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { ...@@ -190,14 +190,19 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
} }
} }
void taosStopTcpServer(void *handle) {
void taosCleanUpTcpServer(void *handle) {
SServerObj *pServerObj = handle; SServerObj *pServerObj = handle;
SThreadObj *pThreadObj;
tTrace("TCP:%s, stop accept new connections", pServerObj->label);
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
if(pServerObj->thread) pthread_join(pServerObj->thread, NULL); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
}
void taosCleanUpTcpServer(void *handle) {
SServerObj *pServerObj = handle;
SThreadObj *pThreadObj;
if (pServerObj == NULL) return;
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
...@@ -226,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -226,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) {
connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
if (connFd == -1) { if (connFd == -1) {
if (errno == EINVAL) { if (errno == EINVAL) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); tTrace("%s TCP server stop accepting new connections, exiting", pServerObj->label);
break; break;
} }
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#define RPC_MAX_UDP_SIZE 65480 #define RPC_MAX_UDP_SIZE 65480
typedef struct { typedef struct {
void *signature;
int index; int index;
int fd; int fd;
uint16_t port; // peer port uint16_t port; // peer port
...@@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads ...@@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn->processData = fp; pConn->processData = fp;
pConn->index = i; pConn->index = i;
pConn->pSet = pSet; pConn->pSet = pSet;
pConn->signature = pConn;
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
if (code != 0) { if (code != 0) {
...@@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pConn->signature = NULL;
if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR);
if (pConn->fd >=0) taosCloseSocket(pConn->fd); if (pConn->fd >=0) taosCloseSocket(pConn->fd);
} }
...@@ -185,7 +181,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -185,7 +181,7 @@ static void *taosRecvUdpData(void *param) {
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if(dataLen <= 0) { if(dataLen <= 0) {
tTrace("%s UDP socket was closed, exiting", pConn->label); tTrace("%s UDP socket was closed, exiting(%s)", pConn->label, strerror(errno));
break; break;
} }
...@@ -221,7 +217,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -221,7 +217,7 @@ static void *taosRecvUdpData(void *param) {
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle; SUdpConn *pConn = (SUdpConn *)chandle;
if (pConn == NULL || pConn->signature != pConn) return -1; if (pConn == NULL) return -1;
struct sockaddr_in destAdd; struct sockaddr_in destAdd;
memset(&destAdd, 0, sizeof(destAdd)); memset(&destAdd, 0, sizeof(destAdd));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册