diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 36b1e41978e94d38d1b3343bda47e8b3caf49904..7e7afc97740f512e4afc8da2e620548bf367a7c9 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -158,7 +158,7 @@ typedef enum { SDB_USER = 5, SDB_AUTH = 6, SDB_ACCT = 7, - SDB_VGROUP = 9, + SDB_VGROUP = 8, SDB_STB = 9, SDB_DB = 10, SDB_FUNC = 11, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b385db181f1b9d0c2ecccab91d582e79f5cafb23..7e71c6bfc7fec62ced2ff3ab763e7d0f66fddd14 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -46,7 +46,7 @@ int64_t tsDnodeStartTime = 0; // common int32_t tsRpcTimer = 300; int32_t tsRpcMaxTime = 600; // seconds; -int32_t tsRpcForceTcp = 0; //disable this, means query, show command use udp protocol as default +int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 5000; int32_t tsShellActivityTimer = 3; // second @@ -1583,7 +1583,7 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else - assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); + //assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); #endif } diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index c21c6a0b860514ce815f4628d1cfc946d68b3123..27cc99c27cb0f5206674215a107838d8a3b3a7ce 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -31,6 +31,7 @@ int32_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); +void dndSendStatusMsg(SDnode *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index b948dc8ce4bb92c92ded94c345b15e3e05033ccf..2cdebab6bf519c89b738ff97f99b935263edd0e7 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -335,7 +335,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { return 0; } -static void dndSendStatusMsg(SDnode *pDnode) { +void dndSendStatusMsg(SDnode *pDnode) { int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusMsg *pStatus = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 09c207bac73d7eb7362bd0ef40996bb7418bac69..245a1e41f6e5a5d65a2ee5513992286ccc9e81bf 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -130,7 +130,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle); + dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); return; } @@ -138,10 +138,9 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { (*fp)(pDnode, pMsg, pEpSet); - dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle, - pMsg->code & 0XFFFF); + dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); } else { - dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); + dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); } rpcFreeCont(pMsg->pCont); } diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 90e8fe7d54b4e240258ec57e265ab75c90f8bc5d..a5b118e67be7656e315e25e0426daf45e217f613 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -194,6 +194,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { } dndSetStat(pDnode, DND_STAT_RUNNING); + dndSendStatusMsg(pDnode); dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully"); diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index 1632438b5c746906ba829751110bc3b981ddf3da..d2a242d7974fc983c1b330d61d537b5bb56e1865 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -80,6 +80,8 @@ TEST_F(DndTestDnode, ShowDnode) { sendMsg(pClient, &showRpcMsg); ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; ASSERT_NE(pShowRsp, nullptr); @@ -170,6 +172,7 @@ TEST_F(DndTestDnode, ShowDnode) { sendMsg(pClient, &retrieveRpcMsg); ASSERT_NE(pClient->pRsp, nullptr); ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; ASSERT_NE(pRetrieveRsp, nullptr); diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index d0431313cd753877dd65bb53fb582ffe92f8c34f..0048b7c4adf7ed4409dd64893f3fb92e5e321b80 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -16,7 +16,7 @@ #include "deploy.h" void initLog(const char* path) { - dDebugFlag = 0; + dDebugFlag = 207; vDebugFlag = 0; mDebugFlag = 207; cDebugFlag = 0; @@ -90,6 +90,7 @@ SServer* createServer(const char* path, const char* fqdn, uint16_t port, const c } void dropServer(SServer* pServer) { + if (pServer == NULL) return; if (pServer->threadId != NULL) { taosDestoryThread(pServer->threadId); } @@ -98,6 +99,8 @@ void dropServer(SServer* pServer) { void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SClient* pClient = (SClient*)parent; pClient->pRsp = pMsg; + uInfo("response:%s from dnode, pCont:%p contLen:%d code:0x%X", taosMsg[pMsg->msgType], pMsg->pCont, pMsg->contLen, + pMsg->code); tsem_post(&pClient->sem); } @@ -143,7 +146,7 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) { epSet.inUse = 0; epSet.numOfEps = 1; epSet.port[0] = pClient->port; - strcpy(epSet.fqdn[0], pClient->fqdn); + memcpy(epSet.fqdn[0], pClient->fqdn, TSDB_FQDN_LEN); rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL); tsem_wait(&pClient->sem); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index abd0d6a96b9f832636f489cc372d2a3177f1f95f..44d13d1fb498cb41a87fb69ab072d2e26be9895e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -45,7 +45,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_VGROUP, - .keyType = SDB_KEY_BINARY, + .keyType = SDB_KEY_INT32, .encodeFp = (SdbEncodeFp)mndVgroupActionEncode, .decodeFp = (SdbDecodeFp)mndVgroupActionDecode, .insertFp = (SdbInsertFp)mndVgroupActionInsert, diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index ce110ede321163f958cce4cb74f1106f3648324c..fb69a74876c31ef41027dd1958c41d378a8fbbe1 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -126,6 +126,8 @@ typedef struct SRpcConn { SRpcReqContext *pContext; // request context } SRpcConn; +static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT; + int tsRpcMaxUdpSize = 15000; // bytes int tsProgressTimer = 100; // not configurable @@ -220,17 +222,22 @@ static void rpcFree(void *p) { free(p); } -int32_t rpcInit(void) { - tsProgressTimer = tsRpcTimer/2; - tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; - tsRpcHeadSize = RPC_MSG_OVERHEAD; +static void rpcInitImp(void) { + tsProgressTimer = tsRpcTimer / 2; + tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer; + tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); tsRpcRefId = taosOpenRef(200, rpcFree); return 0; } - + +int32_t rpcInit(void) { + pthread_once(&tsRpcInitOnce, rpcInitImp); + return 0; +} + void rpcCleanup(void) { taosCloseRef(tsRpcRefId); tsRpcRefId = -1;