diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 29181ed78faa56eef05ada6a119bf25f5b9ee67c..c75fc70d750b2a8a2f7d70837f536e89eae1180e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_COUNTRY_LEN 20 #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 64 +#define TSDB_LABEL_LEN 8 #define TSDB_FQDN_LEN 128 #define TSDB_EP_LEN (TSDB_FQDN_LEN+6) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9c90a90fc0be3d33bb776db5372bc3013cfb07d2..ecbc470945e8cf44b8d2eebfe46d3f58ecd30fe7 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -47,7 +47,7 @@ typedef struct { uint16_t localPort; int8_t connType; int index; // for UDP server only, round robin for multiple threads - char label[12]; + char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index @@ -88,7 +88,7 @@ typedef struct { } SRpcReqContext; typedef struct SRpcConn { - char info[50];// debug info: label + pConn + ahandle + char info[48];// debug info: label + pConn + ahandle int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 151f5db65f51335a1f59ad2658dfacb089ce981a..04a269502e0a79063ca1a9d8db09e8bb5e31c6e8 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -16,6 +16,7 @@ #include "os.h" #include "tsocket.h" #include "tutil.h" +#include "taosdef.h" #include "taoserror.h" #include "rpcLog.h" #include "rpcHead.h" @@ -46,7 +47,7 @@ typedef struct SThreadObj { int pollFd; int numOfFds; int threadId; - char label[12]; + char label[TSDB_LABEL_LEN]; void *shandle; // handle passed by upper layer during server initialization void *(*processData)(SRecvInfo *pPacket); } SThreadObj; @@ -55,7 +56,7 @@ typedef struct { int fd; uint32_t ip; uint16_t port; - char label[12]; + char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; SThreadObj *pThreadObj; @@ -79,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return NULL; } + pServerObj->thread = 0; pServerObj->ip = ip; pServerObj->port = port; tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); @@ -93,8 +95,14 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } int code = 0; + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj->pollFd = -1; + pThreadObj->thread = 0; pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; @@ -114,11 +122,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - pthread_attr_destroy(&thattr); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -130,11 +134,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } if (code == 0) { - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); - pthread_attr_destroy(&thattr); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); @@ -142,38 +142,39 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } if (code != 0) { - free(pServerObj->pThreadObj); - free(pServerObj); + taosCleanUpTcpServer(pServerObj); pServerObj = NULL; } else { tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); } + pthread_attr_destroy(&thattr); return (void *)pServerObj; } static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; - - // signal the thread to stop, try graceful method first, - // and use pthread_cancel when failed - struct epoll_event event = { .events = EPOLLIN }; - eventfd_t fd = eventfd(1, 0); - if (fd == -1) { - // failed to create eventfd, call pthread_cancel instead, which may result in data corruption: - tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno)); - pthread_cancel(pThreadObj->thread); - } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { - // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption: - tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno)); - pthread_cancel(pThreadObj->thread); + eventfd_t fd = -1; + + if (pThreadObj->thread && pThreadObj->pollFd >=0) { + // signal the thread to stop, try graceful method first, + // and use pthread_cancel when failed + struct epoll_event event = { .events = EPOLLIN }; + fd = eventfd(1, 0); + if (fd == -1) { + // failed to create eventfd, call pthread_cancel instead, which may result in data corruption: + tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno)); + pthread_cancel(pThreadObj->thread); + } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption: + tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno)); + pthread_cancel(pThreadObj->thread); + } } - pthread_join(pThreadObj->thread, NULL); - close(pThreadObj->pollFd); - if (fd != -1) { - close(fd); - } + if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL); + if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd); + if (fd != -1) close(fd); while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; @@ -188,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) { SThreadObj *pThreadObj; if (pServerObj == NULL) return; - - shutdown(pServerObj->fd, SHUT_RD); - pthread_join(pServerObj->thread, NULL); + if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); + if(pServerObj->thread) pthread_join(pServerObj->thread, NULL); for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index a8811f41362e32d762a4284c99d88a9bfe4c5e5b..7e2fe0db61a0ffa7e6923679f1c82f84c991382f 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -18,6 +18,7 @@ #include "tsystem.h" #include "ttimer.h" #include "tutil.h" +#include "taosdef.h" #include "rpcLog.h" #include "rpcUdp.h" #include "rpcHead.h" @@ -33,7 +34,7 @@ typedef struct { int fd; uint16_t port; // peer port uint16_t localPort; // local port - char label[12]; // copy from udpConnSet; + char label[TSDB_LABEL_LEN]; // copy from udpConnSet; pthread_t thread; void *hash; void *shandle; // handle passed by upper layer during server initialization @@ -49,7 +50,7 @@ typedef struct { uint16_t port; // local Port void *shandle; // handle passed by upper layer during server initialization int threads; - char label[12]; + char label[TSDB_LABEL_LEN]; void *(*fp)(SRecvInfo *pPacket); SUdpConn udpConn[]; } SUdpConnSet; @@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads } struct sockaddr_in sin; - unsigned int addrlen = sizeof(sin); + unsigned int addrlen = sizeof(sin); if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { pConn->localPort = (uint16_t)ntohs(sin.sin_port); diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 25893969e47c69abe11cc5651b1f02eccb5e61bf..898ab7087617beb5ab2fdff99e7300952edcda82 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -14,6 +14,7 @@ */ #include "os.h" +#include "taosdef.h" #include "tulog.h" #include "tsched.h" #include "ttimer.h" @@ -21,7 +22,7 @@ #define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. typedef struct { - char label[16]; + char label[TSDB_LABEL_LEN]; tsem_t emptySem; tsem_t fullSem; pthread_mutex_t queueMutex;