From 6941b148780303140d701abe42b1710ec561f8c4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Dec 2021 20:43:29 +0800 Subject: [PATCH] TD-10431 fix crash while test dnode create --- include/dnode/mnode/sdb/sdb.h | 2 +- source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/impl/inc/dndDnode.h | 1 + source/dnode/mgmt/impl/src/dndDnode.c | 2 +- source/dnode/mgmt/impl/src/dndTransport.c | 7 +++---- source/dnode/mgmt/impl/src/dnode.c | 1 + source/dnode/mgmt/impl/test/dnode/dnode.cpp | 3 +++ source/dnode/mgmt/impl/test/sut/deploy.cpp | 7 +++++-- source/dnode/mnode/impl/src/mndVgroup.c | 2 +- source/libs/transport/src/rpcMain.c | 17 ++++++++++++----- 10 files changed, 30 insertions(+), 16 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 36b1e41978..7e7afc9774 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -158,7 +158,7 @@ typedef enum { SDB_USER = 5, SDB_AUTH = 6, SDB_ACCT = 7, - SDB_VGROUP = 9, + SDB_VGROUP = 8, SDB_STB = 9, SDB_DB = 10, SDB_FUNC = 11, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b385db181f..7e71c6bfc7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -46,7 +46,7 @@ int64_t tsDnodeStartTime = 0; // common int32_t tsRpcTimer = 300; int32_t tsRpcMaxTime = 600; // seconds; -int32_t tsRpcForceTcp = 0; //disable this, means query, show command use udp protocol as default +int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 5000; int32_t tsShellActivityTimer = 3; // second @@ -1583,7 +1583,7 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else - assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); + //assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); #endif } diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index c21c6a0b86..27cc99c27c 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -31,6 +31,7 @@ int32_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); +void dndSendStatusMsg(SDnode *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index b948dc8ce4..2cdebab6bf 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -335,7 +335,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { return 0; } -static void dndSendStatusMsg(SDnode *pDnode) { +void dndSendStatusMsg(SDnode *pDnode) { int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusMsg *pStatus = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 09c207bac7..245a1e41f6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -130,7 +130,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle); + dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); return; } @@ -138,10 +138,9 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { (*fp)(pDnode, pMsg, pEpSet); - dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle, - pMsg->code & 0XFFFF); + dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); } else { - dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); + dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); } rpcFreeCont(pMsg->pCont); } diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 90e8fe7d54..a5b118e67b 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -194,6 +194,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { } dndSetStat(pDnode, DND_STAT_RUNNING); + dndSendStatusMsg(pDnode); dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully"); diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index 1632438b5c..d2a242d797 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -80,6 +80,8 @@ TEST_F(DndTestDnode, ShowDnode) { sendMsg(pClient, &showRpcMsg); ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; ASSERT_NE(pShowRsp, nullptr); @@ -170,6 +172,7 @@ TEST_F(DndTestDnode, ShowDnode) { sendMsg(pClient, &retrieveRpcMsg); ASSERT_NE(pClient->pRsp, nullptr); ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; ASSERT_NE(pRetrieveRsp, nullptr); diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index d0431313cd..0048b7c4ad 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -16,7 +16,7 @@ #include "deploy.h" void initLog(const char* path) { - dDebugFlag = 0; + dDebugFlag = 207; vDebugFlag = 0; mDebugFlag = 207; cDebugFlag = 0; @@ -90,6 +90,7 @@ SServer* createServer(const char* path, const char* fqdn, uint16_t port, const c } void dropServer(SServer* pServer) { + if (pServer == NULL) return; if (pServer->threadId != NULL) { taosDestoryThread(pServer->threadId); } @@ -98,6 +99,8 @@ void dropServer(SServer* pServer) { void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SClient* pClient = (SClient*)parent; pClient->pRsp = pMsg; + uInfo("response:%s from dnode, pCont:%p contLen:%d code:0x%X", taosMsg[pMsg->msgType], pMsg->pCont, pMsg->contLen, + pMsg->code); tsem_post(&pClient->sem); } @@ -143,7 +146,7 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) { epSet.inUse = 0; epSet.numOfEps = 1; epSet.port[0] = pClient->port; - strcpy(epSet.fqdn[0], pClient->fqdn); + memcpy(epSet.fqdn[0], pClient->fqdn, TSDB_FQDN_LEN); rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL); tsem_wait(&pClient->sem); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index abd0d6a96b..44d13d1fb4 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -45,7 +45,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_VGROUP, - .keyType = SDB_KEY_BINARY, + .keyType = SDB_KEY_INT32, .encodeFp = (SdbEncodeFp)mndVgroupActionEncode, .decodeFp = (SdbDecodeFp)mndVgroupActionDecode, .insertFp = (SdbInsertFp)mndVgroupActionInsert, diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index ce110ede32..fb69a74876 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -126,6 +126,8 @@ typedef struct SRpcConn { SRpcReqContext *pContext; // request context } SRpcConn; +static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT; + int tsRpcMaxUdpSize = 15000; // bytes int tsProgressTimer = 100; // not configurable @@ -220,17 +222,22 @@ static void rpcFree(void *p) { free(p); } -int32_t rpcInit(void) { - tsProgressTimer = tsRpcTimer/2; - tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; - tsRpcHeadSize = RPC_MSG_OVERHEAD; +static void rpcInitImp(void) { + tsProgressTimer = tsRpcTimer / 2; + tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer; + tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); tsRpcRefId = taosOpenRef(200, rpcFree); return 0; } - + +int32_t rpcInit(void) { + pthread_once(&tsRpcInitOnce, rpcInitImp); + return 0; +} + void rpcCleanup(void) { taosCloseRef(tsRpcRefId); tsRpcRefId = -1; -- GitLab