From 4c19f566864336808834dde07298c69afcfdbae3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Feb 2022 00:11:35 +0800 Subject: [PATCH] fix bug --- cmake/cmake.options | 4 +- include/libs/transport/trpc.h | 1 + source/dnode/mgmt/impl/src/dndTransport.c | 7 +- source/dnode/mgmt/impl/test/sut/inc/client.h | 7 +- source/dnode/mgmt/impl/test/sut/inc/sut.h | 7 +- .../dnode/mgmt/impl/test/sut/src/client.cpp | 33 ++++-- source/dnode/mgmt/impl/test/sut/src/sut.cpp | 17 ++- source/dnode/mnode/impl/src/mnode.c | 9 +- source/dnode/mnode/impl/test/qnode/qnode.cpp | 7 +- source/dnode/mnode/impl/test/trans/trans.cpp | 4 +- source/dnode/mnode/impl/test/user/user.cpp | 3 +- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/rpcMain.c | 1 + source/libs/transport/src/trans.c | 4 +- source/libs/transport/src/transCli.c | 84 +++++++++----- source/libs/transport/src/transComm.c | 21 ++-- source/libs/transport/src/transSrv.c | 103 ++++++++++++------ source/os/src/osTimer.c | 82 +++++++------- source/util/src/ttimer.c | 67 +++++++++--- 20 files changed, 296 insertions(+), 168 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 343bc16260..e19c10f6b2 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -47,13 +47,13 @@ option( option( BUILD_WITH_UV "If build with libuv" - OFF + ON ) option( BUILD_WITH_UV_TRANS "If build with libuv_trans " - OFF + ON ) option( diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index f913ba06d0..538aeb1a0e 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -64,6 +64,7 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled + bool noPool; // create conn pool or not // the following is for client app ecurity only char *user; // user name char spi; // security parameter index diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 931cda475c..a006712355 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -154,7 +154,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode *pDnode = parent; + SDnode * pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -192,6 +192,7 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.ckey = INTERNAL_CKEY; rpcInit.spi = 1; rpcInit.parent = pDnode; + rpcInit.noPool = true; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); @@ -217,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode *pDnode = param; + SDnode * pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -311,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/dnode/mgmt/impl/test/sut/inc/client.h b/source/dnode/mgmt/impl/test/sut/inc/client.h index 9cf688fc02..925680d528 100644 --- a/source/dnode/mgmt/impl/test/sut/inc/client.h +++ b/source/dnode/mgmt/impl/test/sut/inc/client.h @@ -21,16 +21,21 @@ class TestClient { bool Init(const char* user, const char* pass, const char* fqdn, uint16_t port); void Cleanup(); + void DoInit(); + SRpcMsg* SendReq(SRpcMsg* pReq); void SetRpcRsp(SRpcMsg* pRsp); tsem_t* GetSem(); + void Restart(); private: char fqdn[TSDB_FQDN_LEN]; uint16_t port; + char user[128]; + char pass[128]; void* clientRpc; SRpcMsg* pRsp; tsem_t sem; }; -#endif /* _TD_TEST_CLIENT_H_ */ \ No newline at end of file +#endif /* _TD_TEST_CLIENT_H_ */ diff --git a/source/dnode/mgmt/impl/test/sut/inc/sut.h b/source/dnode/mgmt/impl/test/sut/inc/sut.h index 23913b0531..250d563a8b 100644 --- a/source/dnode/mgmt/impl/test/sut/inc/sut.h +++ b/source/dnode/mgmt/impl/test/sut/inc/sut.h @@ -20,10 +20,10 @@ #include "os.h" #include "dnode.h" -#include "tmsg.h" #include "tconfig.h" #include "tdataformat.h" #include "tglobal.h" +#include "tmsg.h" #include "tnote.h" #include "trpc.h" #include "tthread.h" @@ -39,6 +39,7 @@ class Testbase { void Restart(); void ServerStop(); void ServerStart(); + void ClientRestart(); SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen); private: @@ -100,7 +101,7 @@ class Testbase { { \ char* bytes = (char*)calloc(1, len); \ for (int32_t i = 0; i < len - 1; ++i) { \ - bytes[i] = b; \ + bytes[i] = b; \ } \ EXPECT_STREQ(test.GetShowBinary(len), bytes); \ } @@ -138,4 +139,4 @@ class Testbase { #define IgnoreTimestamp() \ { test.GetShowTimestamp(); } -#endif /* _TD_TEST_BASE_H_ */ \ No newline at end of file +#endif /* _TD_TEST_BASE_H_ */ diff --git a/source/dnode/mgmt/impl/test/sut/src/client.cpp b/source/dnode/mgmt/impl/test/sut/src/client.cpp index 8403dbf034..589c015013 100644 --- a/source/dnode/mgmt/impl/test/sut/src/client.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/client.cpp @@ -13,33 +13,38 @@ * along with this program. If not, see . */ -#include "tep.h" #include "sut.h" +#include "tep.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; client->SetRpcRsp(pRsp); - uInfo("response:%s from dnode, code:0x%x", TMSG_INFO(pRsp->msgType), pRsp->code); + uInfo("x response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen); tsem_post(client->GetSem()); } -void TestClient::SetRpcRsp(SRpcMsg* pRsp) { this->pRsp = pRsp; }; +void TestClient::SetRpcRsp(SRpcMsg* rsp) { + this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg)); + this->pRsp->msgType = rsp->msgType; + this->pRsp->code = rsp->code; + this->pRsp->pCont = rsp->pCont; + this->pRsp->contLen = rsp->contLen; +}; tsem_t* TestClient::GetSem() { return &sem; } -bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) { +void TestClient::DoInit() { char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); - SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = (char*)"DND-C"; + rpcInit.label = (char*)"shell"; rpcInit.numOfThreads = 1; rpcInit.cfp = processClientRsp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = 30 * 1000; - rpcInit.user = (char*)user; + rpcInit.user = (char*)this->user; rpcInit.ckey = (char*)"key"; rpcInit.parent = this; rpcInit.secret = (char*)secretEncrypt; @@ -47,11 +52,16 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint clientRpc = rpcOpen(&rpcInit); ASSERT(clientRpc); + tsem_init(&this->sem, 0, 0); +} - tsem_init(&sem, 0, 0); +bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) { strcpy(this->fqdn, fqdn); + strcpy(this->user, user); + strcpy(this->pass, pass); this->port = port; - + // this->pRsp = NULL; + this->DoInit(); return true; } @@ -60,11 +70,16 @@ void TestClient::Cleanup() { rpcClose(clientRpc); } +void TestClient::Restart() { + this->Cleanup(); + this->DoInit(); +} SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) { SEpSet epSet = {0}; addEpIntoEpSet(&epSet, fqdn, port); rpcSendRequest(clientRpc, &epSet, pReq, NULL); tsem_wait(&sem); + uInfo("y response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen); return pRsp; } diff --git a/source/dnode/mgmt/impl/test/sut/src/sut.cpp b/source/dnode/mgmt/impl/test/sut/src/sut.cpp index 09a738be3b..771c5886ef 100644 --- a/source/dnode/mgmt/impl/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/sut.cpp @@ -21,9 +21,9 @@ void Testbase::InitLog(const char* path) { mDebugFlag = 143; cDebugFlag = 0; jniDebugFlag = 0; - tmrDebugFlag = 0; - uDebugFlag = 0; - rpcDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; qDebugFlag = 0; wDebugFlag = 0; sDebugFlag = 0; @@ -66,16 +66,21 @@ void Testbase::Init(const char* path, int16_t port) { void Testbase::Cleanup() { tFreeSTableMetaRsp(&metaRsp); - server.Stop(); client.Cleanup(); + taosMsleep(10); + server.Stop(); dndCleanup(); } -void Testbase::Restart() { server.Restart(); } +void Testbase::Restart() { + server.Restart(); + client.Restart(); +} void Testbase::ServerStop() { server.Stop(); } void Testbase::ServerStart() { server.DoStart(); } +void Testbase::ClientRestart() { client.Restart(); } SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) { SRpcMsg rpcMsg = {0}; @@ -194,4 +199,4 @@ int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; } STableMetaRsp* Testbase::GetShowMeta() { return &metaRsp; } -SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } \ No newline at end of file +SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 699ccab92c..64b4aa6dd7 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -77,7 +77,7 @@ static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } @@ -89,7 +89,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } @@ -404,7 +404,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { return NULL; } - if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { + if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && + pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); @@ -439,7 +440,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; + void * ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); diff --git a/source/dnode/mnode/impl/test/qnode/qnode.cpp b/source/dnode/mnode/impl/test/qnode/qnode.cpp index d4e308268a..b8a0e61ca3 100644 --- a/source/dnode/mnode/impl/test/qnode/qnode.cpp +++ b/source/dnode/mnode/impl/test/qnode/qnode.cpp @@ -190,6 +190,9 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq); server2.Stop(); + taosMsleep(1000); + // test.ClientRestart(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); @@ -226,6 +229,7 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { { // server start, wait until the rollback finished server2.DoStart(); + test.ClientRestart(); taosMsleep(1000); int32_t retry = 0; @@ -248,7 +252,6 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { ASSERT_NE(retry, retryMax); } } - TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { { // send message first, then dnode2 crash, result is returned, and rollback is started @@ -315,4 +318,4 @@ TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { ASSERT_NE(retry, retryMax); } -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index 8a62ed639a..fe3872aba8 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -46,8 +46,10 @@ class MndTestTrans : public ::testing::Test { free(buffer); taosFsyncFile(fd); taosCloseFile(fd); + taosMsleep(1000); test.ServerStart(); + test.ClientRestart(); } static Testbase test; @@ -200,4 +202,4 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 2); } -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index d8ce599be1..61b99beeb7 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -617,6 +617,7 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { // restart test.Restart(); + taosMsleep(1000); test.SendShowMetaReq(TSDB_MGMT_TABLE_USER, ""); CHECK_META("show users", 4); @@ -631,4 +632,4 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { CheckTimestamp(); CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN); -} \ No newline at end of file +} diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d5a8cf5f84..2078a218ee 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -217,7 +217,7 @@ typedef struct SConnBuffer { char* buf; int len; int cap; - int left; + int total; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 3c8c922d83..a36b671eb4 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -56,6 +56,7 @@ typedef struct { int8_t connType; int64_t index; char label[TSDB_LABEL_LEN]; + bool noPool; // pool or not char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3bb7d103d7..72c1ff6893 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -64,6 +64,7 @@ typedef struct { void (*cfp)(void *parent, SRpcMsg *, SEpSet *); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); + bool noPool; int32_t refCount; void * parent; void * idPool; // handle to ID pool diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c3d3cfa2ab..48c15ca286 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -27,7 +27,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); } pRpc->cfp = pInit->cfp; if (pInit->connType == TAOS_CONN_SERVER) { @@ -35,6 +35,8 @@ void* rpcOpen(const SRpcInit* pInit) { } else { pRpc->numOfThreads = pInit->numOfThreads; } + + pRpc->noPool = pInit->noPool; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f1bd1ba980..4ed1cc2e73 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -126,6 +126,9 @@ static void clientHandleResp(SCliConn* conn) { pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + // buf's mem alread translated to rpcMsg.pCont + transClearBuffer(&conn->readBuf); + SRpcMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = transContFromHead((char*)pHead); @@ -140,9 +143,9 @@ static void clientHandleResp(SCliConn* conn) { tDebug("client conn %p persist by app", conn); } - tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), - inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), - ntohs(conn->locaddr.sin_port)); + tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, + TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; if (conn->push != NULL && conn->ctnRdCnt != 0) { @@ -150,26 +153,26 @@ static void clientHandleResp(SCliConn* conn) { conn->push = NULL; } else { if (pCtx->pSem == NULL) { - tTrace("client conn %p handle resp", conn); + tTrace("%s client conn %p handle resp", pRpc->label, conn); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); } else { - tTrace("client conn(sync) %p handle resp", conn); + tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } } conn->ctnRdCnt += 1; - // buf's mem alread translated to rpcMsg.pCont - transClearBuffer(&conn->readBuf); - uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); SCliThrdObj* pThrd = conn->hostThrd; // user owns conn->persist = 1 if (conn->push == NULL && conn->persist == 0) { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + if (pRpc->noPool == true) { + } else { + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + } } destroyCmsg(conn->data); conn->data = NULL; @@ -184,7 +187,6 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - tTrace("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; tmsg_t msgType = TDMT_MND_CONNECT; @@ -213,6 +215,7 @@ static void clientHandleExcept(SCliConn* pConn) { } pConn->push = NULL; } + tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); if (pConn->push == NULL) { destroyCmsg(pConn->data); pConn->data = NULL; @@ -226,7 +229,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tTrace("client conn timeout, try to remove expire conn from conn pool"); + tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -307,21 +310,30 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { QUEUE_PUSH(&plist->conn, &conn->conn); } static bool clientReadComplete(SConnBuffer* data) { - STransMsgHead head; - int32_t headLen = sizeof(head); - if (data->len >= headLen) { - memcpy((char*)&head, data->buf, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else if (msgLen == data->len) { - data->left = 0; - return true; - } - } else { - return false; - } + if (data->len >= sizeof(STransMsgHead)) { + STransMsgHead head; + memcpy((char*)&head, data->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + data->total = msgLen; + } + + if (data->len == data->cap && data->total == data->cap) { + return true; + } + return false; + // if (data->len >= headLen) { + // memcpy((char*)&head, data->buf, headLen); + // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + // if (msgLen > data->len) { + // data->left = msgLen - data->len; + // return false; + // } else if (msgLen == data->len) { + // data->left = 0; + // return true; + // } + //} else { + // return false; + //} } static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; @@ -338,7 +350,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - uv_read_stop((uv_stream_t*)conn->stream); + // uv_read_stop((uv_stream_t*)conn->stream); tTrace("client conn %p read complete", conn); clientHandleResp(conn); } else { @@ -346,6 +358,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } return; } + if (nread == UV_EOF) { + tError("client conn %p read error: %s", conn, uv_err_name(nread)); + clientHandleExcept(conn); + } assert(nread <= 0); if (nread == 0) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb @@ -353,7 +369,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf // read(2). return; } - if (nread < 0 || nread == UV_EOF) { + if (nread < 0) { tError("client conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } @@ -467,6 +483,7 @@ static void clientConnCb(uv_connect_t* req, int status) { static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("client work thread %p start to quit", pThrd); destroyCmsg(pMsg); + destroyConnPool(pThrd->pool); // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL); uv_timer_stop(pThrd->timer); pThrd->quit = true; @@ -483,7 +500,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle == NULL) { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (pCtx->pTransInst->noPool == true) { + } else { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + } if (conn != NULL) { tTrace("client conn %p get from conn pool", conn); } @@ -512,7 +532,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - + uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1); + int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1); + if (ret) { + tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret)); + } // write req handle conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7aa5aa16f1..388e0da4e0 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -205,6 +205,7 @@ int transInitBuffer(SConnBuffer* buf) { } int transClearBuffer(SConnBuffer* buf) { memset(buf, 0, sizeof(*buf)); + buf->total = -1; return 0; } int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { @@ -214,27 +215,25 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * info--->| */ - static const int CAPACITY = 1024; + static const int CAPACITY = sizeof(STransMsgHead); SConnBuffer* p = connBuf; if (p->cap == 0) { p->buf = (char*)calloc(CAPACITY, sizeof(char)); p->len = 0; p->cap = CAPACITY; - p->left = -1; + p->total = 0; uvBuf->base = p->buf; uvBuf->len = CAPACITY; } else { - if (p->len >= p->cap) { - if (p->left == -1) { - p->cap *= 2; - p->buf = realloc(p->buf, p->cap); - } else if (p->len + p->left > p->cap) { - p->cap = p->len + p->left; - p->buf = realloc(p->buf, p->len + p->left); - } - } + STransMsgHead head; + memcpy((char*)&head, p->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + + p->total = msgLen; + p->cap = msgLen; + p->buf = realloc(p->buf, p->cap); uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7ddeb99c9d..5292bad209 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -61,6 +61,7 @@ typedef struct SWorkThrdObj { SAsyncPool* asyncPool; // uv_async_t* workerAsync; // queue msg; + queue conn; pthread_mutex_t msgMtx; void* pTransInst; } SWorkThrdObj; @@ -103,7 +104,7 @@ static void uvStartSendResp(SSrvMsg* msg); static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); -static SSrvConn* createConn(); +static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void uvDestroyConn(uv_handle_t* handle); @@ -117,11 +118,6 @@ static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - /* - * formate of data buffer: - * |<--------------------------data from socket------------------------------->| - * |<------STransMsgHead------->|<-------------------other data--------------->| - */ SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); @@ -131,23 +127,27 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - STransMsgHead head; - int32_t headLen = sizeof(head); - if (data->len >= headLen) { - memcpy((char*)&head, data->buf, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else if (msgLen == data->len) { - return true; - } else if (msgLen < data->len) { - return false; - // handle other packet later - } - } else { - return false; - } + if (data->len == data->cap && data->total == data->cap) { + return true; + } + return false; + // STransMsgHead head; + // int32_t headLen = sizeof(head); + // if (data->len >= headLen) { + // memcpy((char*)&head, data->buf, headLen); + // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + // if (msgLen > data->len) { + // data->left = msgLen - data->len; + // return false; + // } else if (msgLen == data->len) { + // return true; + // } else if (msgLen < data->len) { + // return false; + // // handle other packet later + // } + //} else { + // return false; + //} } // static void uvDoProcess(SRecvInfo* pRecv) { @@ -241,7 +241,7 @@ static void uvHandleReq(SSrvConn* pConn) { } pConn->inType = pHead->msgType; - assert(transIsReq(pHead->msgType)); + // assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; pHead->code = htonl(pHead->code); @@ -266,9 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; - tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -290,6 +290,14 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } + if (nread == UV_EOF) { + tError("server conn %p read error: %s", conn, uv_err_name(nread)); + if (conn->ref > 1) { + conn->ref++; // ref > 1 signed that write is in progress + } + destroyConn(conn, true); + return; + } if (nread == 0) { return; } @@ -302,8 +310,8 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(sizeof(char)); buf->len = 2; + buf->base = calloc(1, sizeof(char) * buf->len); } void uvOnTimeoutCb(uv_timer_t* handle) { @@ -386,6 +394,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; + pConn->ref--; // if (taosArrayGetSize(pConn->srvMsgs) > 0) { tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); @@ -403,6 +412,16 @@ static void destroySmsg(SSrvMsg* smsg) { transFreeMsg(smsg->msg.pCont); free(smsg); } +static void destroyAllConn(SWorkThrdObj* pThrd) { + while (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* h = QUEUE_HEAD(&pThrd->conn); + QUEUE_REMOVE(h); + QUEUE_INIT(h); + + SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + destroyConn(c, true); + } +} void uvWorkerAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SWorkThrdObj* pThrd = item->pThrd; @@ -424,8 +443,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) { continue; } if (msg->pConn == NULL) { - // free(msg); + + destroyAllConn(pThrd); uv_stop(pThrd->loop); } else { uvStartSendResp(msg); @@ -439,6 +459,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; + uv_close((uv_handle_t*)&srv->server, NULL); uv_stop(srv->loop); } @@ -491,7 +512,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SSrvConn* pConn = createConn(); + SSrvConn* pConn = createConn(pThrd); pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ @@ -507,6 +528,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; + uv_tcp_nodelay(pConn->pTcp, 1); + uv_tcp_keepalive(pConn->pTcp, 1, 1); + // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter->data = pConn; @@ -560,6 +584,9 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); + // conn set + QUEUE_INIT(&pThrd->conn); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -598,8 +625,13 @@ void* workerThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -static SSrvConn* createConn() { +static SSrvConn* createConn(void* hThrd) { + SWorkThrdObj* pThrd = hThrd; + SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); + QUEUE_INIT(&pConn->queue); + + QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); ++pConn->ref; @@ -610,7 +642,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tTrace("server conn %p try to destroy", conn); + tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref); if (--conn->ref > 0) { return; } @@ -621,19 +653,18 @@ static void destroyConn(SSrvConn* conn, bool clear) { destroySmsg(msg); } taosArrayDestroy(conn->srvMsgs); - - // destroySmsg(conn->pSrvMsg); - // conn->pSrvMsg = NULL; + QUEUE_REMOVE(&conn->queue); if (clear) { - uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); + uv_tcp_close_reset(conn->pTcp, uvDestroyConn); + // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } } static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); - free(conn->pTimer); + // free(conn->pTimer); // free(conn->pTcp); free(conn->pWriter); free(conn); diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index 7e542ef80f..bb526e0ba0 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -22,13 +22,12 @@ * windows implementation */ - -#include #include -#include +#include #include +#include -#pragma warning( disable : 4244 ) +#pragma warning(disable : 4244) typedef void (*win_timer_f)(int signo); @@ -40,8 +39,8 @@ void WINAPI taosWinOnTimer(UINT wTimerID, UINT msg, DWORD_PTR dwUser, DWORD_PTR } static MMRESULT timerId; -int taosInitTimer(win_timer_f callback, int ms) { - DWORD_PTR param = *((int64_t *) & callback); +int taosInitTimer(win_timer_f callback, int ms) { + DWORD_PTR param = *((int64_t *)&callback); timerId = timeSetEvent(ms, 1, (LPTIMECALLBACK)taosWinOnTimer, param, TIME_PERIODIC); if (timerId == 0) { @@ -50,9 +49,7 @@ int taosInitTimer(win_timer_f callback, int ms) { return 0; } -void taosUninitTimer() { - timeKillEvent(timerId); -} +void taosUninitTimer() { timeKillEvent(timerId); } #elif defined(_TD_DARWIN_64) @@ -60,32 +57,32 @@ void taosUninitTimer() { * darwin implementation */ -#include #include +#include #include static void (*timer_callback)(int); -static int timer_ms = 0; -static pthread_t timer_thread; -static int timer_kq = -1; -static volatile int timer_stop = 0; +static int timer_ms = 0; +static pthread_t timer_thread; +static int timer_kq = -1; +static volatile int timer_stop = 0; -static void* timer_routine(void *arg) { +static void* timer_routine(void* arg) { (void)arg; setThreadName("timer"); - int r = 0; + int r = 0; struct timespec to = {0}; - to.tv_sec = timer_ms / 1000; - to.tv_nsec = (timer_ms % 1000) * 1000000; + to.tv_sec = timer_ms / 1000; + to.tv_nsec = (timer_ms % 1000) * 1000000; while (!timer_stop) { struct kevent64_s kev[10] = {0}; - r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to); - if (r!=0) { + r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), 0, &to); + if (r != 0) { fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__); abort(); } - timer_callback(SIGALRM); // just mock + timer_callback(SIGALRM); // just mock } return NULL; @@ -93,11 +90,13 @@ static void* timer_routine(void *arg) { int taosInitTimer(void (*callback)(int), int ms) { int r = 0; - timer_ms = ms; + timer_kq = -1; + timer_stop = 0; + timer_ms = ms; timer_callback = callback; timer_kq = kqueue(); - if (timer_kq==-1) { + if (timer_kq == -1) { fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__); // since no caller of this func checks the return value for the moment abort(); @@ -144,10 +143,10 @@ static void taosDeleteTimer(void *tharg) { timer_delete(*pTimer); } -static pthread_t timerThread; -static timer_t timerId; +static pthread_t timerThread; +static timer_t timerId; static volatile bool stopTimer = false; -static void *taosProcessAlarmSignal(void *tharg) { +static void * taosProcessAlarmSignal(void *tharg) { // Block the signal sigset_t sigset; sigemptyset(&sigset); @@ -159,18 +158,18 @@ static void *taosProcessAlarmSignal(void *tharg) { setThreadName("tmr"); - #ifdef _ALPINE - sevent.sigev_notify = SIGEV_THREAD; - sevent.sigev_value.sival_int = syscall(__NR_gettid); - #else - sevent.sigev_notify = SIGEV_THREAD_ID; - sevent._sigev_un._tid = syscall(__NR_gettid); - #endif - +#ifdef _ALPINE + sevent.sigev_notify = SIGEV_THREAD; + sevent.sigev_value.sival_int = syscall(__NR_gettid); +#else + sevent.sigev_notify = SIGEV_THREAD_ID; + sevent._sigev_un._tid = syscall(__NR_gettid); +#endif + sevent.sigev_signo = SIGALRM; if (timer_create(CLOCK_REALTIME, &sevent, &timerId) == -1) { - //printf("Failed to create timer"); + // printf("Failed to create timer"); } pthread_cleanup_push(taosDeleteTimer, &timerId); @@ -182,36 +181,37 @@ static void *taosProcessAlarmSignal(void *tharg) { ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK; if (timer_settime(timerId, 0, &ts, NULL)) { - //printf("Failed to init timer"); + // printf("Failed to init timer"); return NULL; } int signo; while (!stopTimer) { if (sigwait(&sigset, &signo)) { - //printf("Failed to wait signal: number %d", signo); + // printf("Failed to wait signal: number %d", signo); continue; } /* //printf("Signal handling: number %d ......\n", signo); */ callback(0); } - + pthread_cleanup_pop(1); return NULL; } int taosInitTimer(void (*callback)(int), int ms) { + stopTimer = false; pthread_attr_t tattr; pthread_attr_init(&tattr); int code = pthread_create(&timerThread, &tattr, taosProcessAlarmSignal, callback); pthread_attr_destroy(&tattr); if (code != 0) { - //printf("failed to create timer thread"); + // printf("failed to create timer thread"); return -1; } else { - //printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread)); + // printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread)); } return 0; @@ -220,7 +220,7 @@ int taosInitTimer(void (*callback)(int), int ms) { void taosUninitTimer() { stopTimer = true; - //printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread)); + // printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread)); pthread_join(timerThread, NULL); } diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 1fdc2257d7..65101a5e07 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -13,19 +13,49 @@ * along with this program. If not, see . */ +#include "ttimer.h" #include "os.h" +#include "taoserror.h" #include "tlog.h" #include "tsched.h" -#include "ttimer.h" #include "tutil.h" -#include "taoserror.h" -#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} +#define tmrFatal(...) \ + { \ + if (tmrDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrError(...) \ + { \ + if (tmrDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrWarn(...) \ + { \ + if (tmrDebugFlag & DEBUG_WARN) { \ + taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrInfo(...) \ + { \ + if (tmrDebugFlag & DEBUG_INFO) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrDebug(...) \ + { \ + if (tmrDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrTrace(...) \ + { \ + if (tmrDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } #define TIMER_STATE_WAITING 0 #define TIMER_STATE_EXPIRED 1 @@ -81,7 +111,7 @@ typedef struct time_wheel_t { tmr_obj_t** slots; } time_wheel_t; -int32_t tmrDebugFlag = 131; +int32_t tmrDebugFlag = 131; uint32_t tsMaxTmrCtrl = 512; static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT; @@ -91,7 +121,7 @@ static tmr_ctrl_t* unusedTmrCtrl = NULL; static void* tmrQhandle; static int numOfTmrCtrl = 0; -int taosTmrThreads = 1; +int taosTmrThreads = 1; static uintptr_t nextTimerId = 0; static time_wheel_t wheels[] = { @@ -119,7 +149,7 @@ static void timerDecRef(tmr_obj_t* timer) { static void lockTimerList(timer_list_t* list) { int64_t tid = taosGetSelfPthreadId(); - int i = 0; + int i = 0; while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) { if (++i % 1000 == 0) { sched_yield(); @@ -276,11 +306,11 @@ static void addToExpired(tmr_obj_t* head) { const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue."; while (head != NULL) { - uintptr_t id = head->id; + uintptr_t id = head->id; tmr_obj_t* next = head->next; tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param); - SSchedMsg schedMsg; + SSchedMsg schedMsg; schedMsg.fp = NULL; schedMsg.tfp = processExpiredTimer; schedMsg.msg = NULL; @@ -491,6 +521,8 @@ static void taosTmrModuleInit(void) { return; } + memset(&timerMap, 0, sizeof(timerMap)); + for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) { tmr_ctrl_t* ctrl = tmrCtrls + i; ctrl->next = ctrl + 1; @@ -570,7 +602,8 @@ void taosTmrCleanUp(void* handle) { unusedTmrCtrl = ctrl; pthread_mutex_unlock(&tmrCtrlMutex); - if (numOfTmrCtrl <=0) { + tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl); + if (numOfTmrCtrl <= 0) { taosUninitTimer(); taosCleanUpScheduler(tmrQhandle); @@ -585,7 +618,7 @@ void taosTmrCleanUp(void* handle) { for (size_t i = 0; i < timerMap.size; i++) { timer_list_t* list = timerMap.slots + i; - tmr_obj_t* t = list->timers; + tmr_obj_t* t = list->timers; while (t != NULL) { tmr_obj_t* next = t->mnext; free(t); @@ -595,6 +628,8 @@ void taosTmrCleanUp(void* handle) { free(timerMap.slots); free(tmrCtrls); - tmrDebug("timer module is cleaned up"); + tmrCtrls = NULL; + unusedTmrCtrl = NULL; + tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart } } -- GitLab