diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemphone.h index 300f1e8ef1bca311e14fc220ca72bedd8a5f4152..c4fc98988a98b751ce0ec4cd49e90f2d013d40dc 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemphone.h @@ -31,6 +31,7 @@ extern "C" { // TAOS_OS_FUNC_SEMPHONE_PTHREAD bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetPthreadId(); +void taosResetPthread(pthread_t *thread); #ifdef __cplusplus } diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h index 63d51287837746f91fe0ee31ed09bf3f78081029..1118b3c64b7bd1296a2b083143e444a665c2264e 100644 --- a/src/os/inc/osWindows.h +++ b/src/os/inc/osWindows.h @@ -160,6 +160,9 @@ char * strndup(const char *s, size_t n); #define TCP_KEEPCNT 0x1234 #define TCP_KEEPIDLE 0x1234 #define TCP_KEEPINTVL 0x1234 +#define SHUT_RDWR SD_BOTH +#define SHUT_RD SD_RECEIVE +#define SHUT_WR SD_SEND #define LOCK_EX 1 #define LOCK_NB 2 diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index 82b916b4d73080c55da69ac090d9b5b56e23685b..1f1ef268c61c86e11ef67fa588fc0b2f5bffebfa 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -20,5 +20,6 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } +void taosResetPthread(pthread_t *thread) { *thread = 0; } #endif \ No newline at end of file diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index 3f76b202c9e85eecbe216f7226370f8221250512..751f03e52ac3d8b4cb1a36f7330bbd4c7faef328 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -82,7 +82,7 @@ void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, pCache->cleanFp = cleanFp; pCache->tmrCtrl = tmrCtrl; pCache->lockedBy = calloc(sizeof(int64_t), maxSessions); - taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); + taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); pthread_mutex_init(&pCache->mutex, NULL); @@ -226,7 +226,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) { } // tTrace("timer, total connections in cache:%d", pCache->total); - taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); + taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); } static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 01c753ec5b0b415eb015c92df2c5f45da832aac3..29afe8d2abe2535d8fc9f605fcb68286f85cc6d8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -33,7 +33,7 @@ #include "rpcHead.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) +#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) @@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcReqContext *pContext; int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext->ahandle = pMsg->ahandle; pContext->signature = pContext; pContext->pRpc = (SRpcInfo *)shandle; @@ -492,7 +492,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; - pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); memset(pRsp, 0, sizeof(SRpcMsg)); @@ -654,7 +654,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->pRpc = pRpc; pConn->sid = sid; - pConn->tranId = (uint16_t)(random() & 0xFFFF); + pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId); pConn->spi = pRpc->spi; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 7fa05af9b040ef6a962ad236eaeb8e2747218fdb..c69456b8e282dcad61d3c30fca89efabdc8ecbda 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pServerObj->fd = -1; - pServerObj->thread = 0; + taosResetPthread(&pServerObj->thread); pServerObj->ip = ip; pServerObj->port = port; tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); @@ -104,7 +104,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { pThreadObj->pollFd = -1; - pThreadObj->thread = 0; + taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index f75375e2fea16efa924eaac0c6427e3608dc07ae..7e59210caee47c533ed44fcc94eed11b5aa1b29d 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -144,7 +144,9 @@ void taosStopUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - if (pConn->thread) pthread_join(pConn->thread, NULL); + if (taosCheckPthreadValid(pConn->thread)) { + pthread_join(pConn->thread, NULL); + } taosTFree(pConn->buffer); // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); }