diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 3b9af741c39d9f5dcae6031f6f120ba56df9c972..0d4da965e2c1778b5d7e55cf7becf0bf46e5b681 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() { continue; } - mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); - mnodeAddMnode(pDnode->dnodeId); + mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); + mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true); - numOfMnodes = mnodeGetMnodesNum(); - if (numOfMnodes >= tsNumOfMnodes) return; + // Only create one mnode each time + return; } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index acc38ac8ed5be82d0c0df19445e98bb47128589f..449a92abcb0dce600eac530354d39144d7ae1bac 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -20,6 +20,7 @@ #include "tcache.h" #include "tnote.h" #include "trpc.h" +#include "ttimer.h" #include "tscLog.h" #include "tscSubquery.h" #include "tscUtil.h" @@ -260,6 +261,9 @@ void taos_close(TAOS *taos) { return; } + pObj->signature = NULL; + taosTmrStopA(&(pObj->pTimer)); + SSqlObj* pHb = pObj->pHb; if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 82ed7dd1793882c15929aa0899e04357a99359bb..8414d79a9815287dfdfae3af5cc123304745c56d 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -33,7 +33,8 @@ typedef struct { } SMPeerWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMPeerWorker *peerWorker; } SMPeerWorkerPool; @@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param); int32_t dnodeInitMnodePeer() { tsMPeerQset = taosOpenQset(); - tsMPeerPool.num = 1; - tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num); + tsMPeerPool.maxNum = 1; + tsMPeerPool.curNum = 0; + tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum); if (tsMPeerPool.peerWorker == NULL) return -1; - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; pWorker->workerId = i; + dDebug("dnode mpeer worker:%d is created", i); } - dInfo("dnode mpeer is opened"); + dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset); return 0; } void dnodeCleanupMnodePeer() { - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMPeerQset); } + dDebug("dnode mpeer worker:%d is closed", i); } - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; + dDebug("dnode mpeer worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mpeer worker:%d join success", i); } + dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset); + taosCloseQset(tsMPeerQset); + tsMPeerQset = NULL; taosTFree(tsMPeerPool.peerWorker); - dInfo("dnode mpeer is closed"); } int32_t dnodeAllocateMnodePqueue() { @@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() { taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL); - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; pWorker->workerId = i; @@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num); + + tsMPeerPool.curNum = i + 1; + dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum); } dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue); @@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() { } void dnodeFreeMnodePqueue() { + dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue); taosCloseQueue(tsMPeerQueue); tsMPeerQueue = NULL; } @@ -148,7 +159,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { while (1) { if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) { - dDebug("dnodeProcessMnodePeerQueue: got no message from qset, exiting..."); + dDebug("qset:%p, mnode peer got no message from qset, exiting", tsMPeerQset); break; } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 6e610d8498ad4776d0eb02098129d6b87992ec0a..fdcbb5889f766ddebdb3f1e56ccffa0b5b129552 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -33,7 +33,8 @@ typedef struct { } SMReadWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMReadWorker *readWorker; } SMReadWorkerPool; @@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param); int32_t dnodeInitMnodeRead() { tsMReadQset = taosOpenQset(); - tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2; - tsMReadPool.num = MAX(2, tsMReadPool.num); - tsMReadPool.num = MIN(4, tsMReadPool.num); - tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num); + tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2; + tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum); + tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum); + tsMReadPool.curNum = 0; + tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum); if (tsMReadPool.readWorker == NULL) return -1; - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; pWorker->workerId = i; + dDebug("dnode mread worker:%d is created", i); } - dInfo("dnode mread is opened"); + dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset); return 0; } void dnodeCleanupMnodeRead() { - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMReadQset); } + dDebug("dnode mread worker:%d is closed", i); } - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; + dDebug("dnode mread worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mread worker:%d start to join", i); } + dDebug("dnode mread is closed, qset:%p", tsMReadQset); + taosCloseQset(tsMReadQset); + tsMReadQset = NULL; free(tsMReadPool.readWorker); - - dInfo("dnode mread is closed"); } int32_t dnodeAllocateMnodeRqueue() { @@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() { taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL); - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; pWorker->workerId = i; @@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num); + tsMReadPool.curNum = i + 1; + dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum); } dDebug("dnode mread queue:%p is allocated", tsMReadQueue); @@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() { } void dnodeFreeMnodeRqueue() { + dDebug("dnode mread queue:%p is freed", tsMReadQueue); taosCloseQueue(tsMReadQueue); tsMReadQueue = NULL; } @@ -156,7 +165,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { while (1) { if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) { - dDebug("dnodeProcessMnodeReadQueue: got no message from qset, exiting..."); + dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset); break; } diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 0a305b5598161447bd1e94f0d8c7202389a2f724..384a0fae75088197d7fb01dd67c6e1b9d38739cd 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -34,7 +34,8 @@ typedef struct { } SMWriteWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMWriteWorker *writeWorker; } SMWriteWorkerPool; @@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param); int32_t dnodeInitMnodeWrite() { tsMWriteQset = taosOpenQset(); - - tsMWritePool.num = 1; - tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num); + + tsMWritePool.maxNum = 1; + tsMWritePool.curNum = 0; + tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum); if (tsMWritePool.writeWorker == NULL) return -1; - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; pWorker->workerId = i; + dDebug("dnode mwrite worker:%d is created", i); } - dInfo("dnode mwrite is opened"); + dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset); return 0; } void dnodeCleanupMnodeWrite() { - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMWriteQset); } + dDebug("dnode mwrite worker:%d is closed", i); } - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + dDebug("dnode mwrite worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mwrite worker:%d join success", i); } + dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset); + taosCloseQset(tsMWriteQset); + tsMWriteQset = NULL; taosTFree(tsMWritePool.writeWorker); - dInfo("dnode mwrite is closed"); } int32_t dnodeAllocateMnodeWqueue() { @@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() { taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL); - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; pWorker->workerId = i; @@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num); + tsMWritePool.curNum = i + 1; + dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum); } dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue); @@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() { } void dnodeFreeMnodeWqueue() { + dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue); taosCloseQueue(tsMWriteQueue); tsMWriteQueue = NULL; } @@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); mnodeCreateMsg(pWrite, pMsg); - dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]); + dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { + dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); + mnodeCleanupMsg(pWrite); taosFreeQitem(pWrite); } @@ -158,7 +172,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { while (1) { if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { - dDebug("dnodeProcessMnodeWriteQueue: got no message from qset, exiting..."); + dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset); break; } @@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { dnodeSendRedirectMsg(pMsg, true); dnodeFreeMnodeWriteMsg(pWrite); } else { - dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, - taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); + dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 8f2b687dc4df129d508ba0a1515d5b00d41a57c1..968a8d9759e5618753996476b40efc3be77f7925 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg; dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeReadDnodeCfg(); @@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) { while (1) { if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { - dDebug("dnode mgmt got no message from qset, exit ..."); + dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); break; } @@ -451,10 +453,34 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { } static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { - SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; + SMDCfgDnodeMsg *pCfg = pMsg->pCont; return taosCfgDynamicOptions(pCfg->config); } +static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { + SMDCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + if (pCfg->dnodeId != dnodeGetDnodeId()) { + dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { + dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); + return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; + } + + dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum); + for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) { + pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId); + dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp); + } + + dnodeStartMnode(&pCfg->mnodes); + + return TSDB_CODE_SUCCESS; +} + void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { if (pEpSet->numOfEps <= 0) { dError("mnode EP list for peer is changed, but content is invalid, discard it"); @@ -465,29 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { for (int i = 0; i < pEpSet->numOfEps; ++i) { pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); - - if (!mnodeIsRunning()) { - if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { - dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]); - bool find = false; - for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) { - if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) { - dInfo("localEp found in mnode infos"); - find = true; - break; - } - } - - if (!find) { - dInfo("localEp not found in mnode infos, will set into mnode infos"); - tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN); - tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId(); - tsDMnodeInfos.nodeNum++; - } - - dnodeStartMnode(); - } - } } tsDMnodeEpSet = *pEpSet; @@ -532,7 +535,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); - dnodeProcessModuleStatus(pCfg->moduleStatus); + + // will not set mnode in status msg + // dnodeProcessModuleStatus(pCfg->moduleStatus); dnodeUpdateDnodeCfg(pCfg); dnodeUpdateMnodeInfos(pMnodes); @@ -576,7 +581,7 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { } dnodeSaveMnodeInfos(); - sdbUpdateSync(); + sdbUpdateAsync(); } static bool dnodeReadMnodeInfos() { diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index ba7cdf2664cb641047f5651920925808e5ec530e..001c73eb3946fe46984c25758d9e0ac2a678dd37 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { } } -bool dnodeStartMnode() { +bool dnodeStartMnode(void *pMnodes) { + SDMMnodeInfos *mnodes = pMnodes; + if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { dDebug("mnode module is already started, module status:%d", tsModuleStatus); return false; @@ -156,6 +158,7 @@ bool dnodeStartMnode() { dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); dnodeProcessModuleStatus(moduleStatus); - sdbUpdateSync(); + sdbUpdateSync(mnodes); + return true; } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index c09d7422396b4525945ae0bb1f32d77d82ac9903..3bc2f7b48b319f3c9e6215a05463ebedc74035fa 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -48,6 +48,7 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; @@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); } -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { +void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); } + +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { + rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp); +} \ No newline at end of file diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 4a1d337824e3a7879c651b8db3a3465e526c9299..a5c5d4759bbabf054de4b933c475b1cd2fd26e90 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; - dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); @@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE; SRpcMsg rpcRsp = {0}; - dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); terrno = rpcRsp.code; if (rpcRsp.code != 0) { diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index fb4ffcdafa3847d1583a58b773738148b53e2c1d..1a3d0ebc27e432d5038301908514f941cc0dfb18 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) { while (1) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { - dDebug("dnodeProcessReadQueee: got no message from qset, exiting..."); + dDebug("qset:%p dnode read got no message from qset, exiting", readQset); break; } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 51bc8890fcd98b6f5dee4c88d2858990e68bc2a3..3f2c9df22216c6d062472231bb4be8a585208c3a 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) { while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs == 0) { - dDebug("dnodeProcessWriteQueee: got no message from qset, exiting..."); + dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset); break; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 017241c4f869dcc450dd552c922e72446c8922f7..83d2a4ad9c58a77510aacf076296213779327326 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -43,11 +43,12 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); -bool dnodeStartMnode(); +bool dnodeStartMnode(void *pModes); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeAllocateVnodeWqueue(void *pVnode); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 03a25e0f92b881b6b1b9bcfcf6605257b776ae1b..5bef7402e30a0cc9f1964cc2430e0b1f5141590c 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -60,7 +60,8 @@ int32_t mnodeInitSystem(); int32_t mnodeStartSystem(); void mnodeCleanupSystem(); void mnodeStopSystem(); -void sdbUpdateSync(); +void sdbUpdateAsync(); +void sdbUpdateSync(void *pMnodes); bool mnodeIsRunning(); int32_t mnodeProcessRead(SMnodeMsg *pMsg); int32_t mnodeProcessWrite(SMnodeMsg *pMsg); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index b5d22ea80c2c0a9f338b537c3452929d0141e6e5..786342b5a6d23ddcd1f72cd73680c7ab9b5f48f5 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 50b31a86ccd743b0e56c85e50628f9574bfc8c8e..507ccf8d80f189954ffac3d3ae390d64cbc7fb1a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) @@ -719,6 +719,12 @@ typedef struct { char ep[TSDB_EP_LEN]; // end point, hostname:port } SCMCreateDnodeMsg, SCMDropDnodeMsg; +typedef struct { + int32_t dnodeId; + char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port + SDMMnodeInfos mnodes; +} SMDCreateMnodeMsg; + typedef struct { int32_t dnodeId; int32_t vgId; diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 0976ea8acd74e511a6cda374dc6ad5501b503482..a28a03ea4028a243a27e79688de54627aaeb7064 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -31,7 +31,7 @@ typedef enum { int32_t mnodeInitMnodes(); void mnodeCleanupMnodes(); -int32_t mnodeAddMnode(int32_t dnodeId); +void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm); int32_t mnodeDropMnode(int32_t dnodeId); void mnodeDropMnodeLocal(int32_t dnodeId); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 040bc259cc4096b9f4b86918e4793060c740d7e9..4c777e4eedba6694bf854e59a8760860b9f7c010 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() { mnodeCreateDnode(tsLocalEp, NULL); SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp); if (pDnode != NULL) { - mnodeAddMnode(pDnode->dnodeId); + mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false); mnodeDecDnodeRef(pDnode); } } diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index d63a5758686f5db46926e1b3c1bc97becd53d8ae..ea2ac9bf90ed6669c01402c78e11c23bd200dc3d 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -109,7 +109,7 @@ int32_t mnodeStartSystem() { mInfo("mnode is initialized successfully"); - sdbUpdateSync(); + sdbUpdateSync(NULL); return 0; } diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 8736e30217705ae37129a14dbf1a01693db77e92..68828ac24d8386594b16facb2d28755891952198 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -23,6 +23,8 @@ #include "tutil.h" #include "tsocket.h" #include "tdataformat.h" +#include "dnode.h" +#include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeMnode.h" @@ -30,6 +32,7 @@ #include "mnodeSdb.h" #include "mnodeShow.h" #include "mnodeUser.h" +#include "mnodeVgroup.h" static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; @@ -266,25 +269,87 @@ void mnodeGetMnodeInfos(void *mnodeInfos) { mnodeMnodeUnLock(); } -int32_t mnodeAddMnode(int32_t dnodeId) { +static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { + mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp); + + SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg)); + if (pCreate == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + pCreate->dnodeId = htonl(dnodeId); + tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); + pCreate->mnodes = tsMnodeInfos; + bool found = false; + for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) { + if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) { + found = true; + } + } + if (!found) { + pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId); + tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); + pCreate->mnodes.nodeNum++; + } + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pCreate; + rpcMsg.contLen = sizeof(SMDCreateMnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE; + + SRpcMsg rpcRsp = {0}; + SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp); + dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet); + + if (rpcRsp.code != TSDB_CODE_SUCCESS) { + mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code)); + } else { + mDebug("dnode:%d, create mnode msg is disposed, mnode is created in dnode", dnodeId); + } + + rpcFreeCont(rpcRsp.pCont); + return rpcRsp.code; +} + +static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + mError("failed to create mnode, reason:%s", tstrerror(code)); + } else { + mDebug("mnode is created successfully"); + mnodeUpdateMnodeEpSet(); + sdbUpdateAsync(); + } + + return code; +} + +void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, - .pObj = pMnode, + .pObj = pMnode, + .writeCb = mnodeCreateMnodeCb }; - int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - taosTFree(pMnode); + int32_t code = TSDB_CODE_SUCCESS; + if (needConfirm) { + code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp); } - mnodeUpdateMnodeEpSet(); + if (code != TSDB_CODE_SUCCESS) { + taosTFree(pMnode); + return; + } - return code; + code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code)); + taosTFree(pMnode); + } } void mnodeDropMnodeLocal(int32_t dnodeId) { @@ -296,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { } mnodeUpdateMnodeEpSet(); + sdbUpdateAsync(); } int32_t mnodeDropMnode(int32_t dnodeId) { @@ -315,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { sdbDecRef(tsMnodeSdb, pMnode); mnodeUpdateMnodeEpSet(); + sdbUpdateAsync(); return code; } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 8b368a33f4c7336178fcd949b729ca81fa660b4a..2a04f541c51483ccba93369a65061507f83ead23 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, + mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { - mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); + if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { + epSet->inUse = (i + 1) % epSet->numOfEps; + mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + } else { + mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + } } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index af2fed3408ab726b5a819f6717fab497fcba1136..93b944febb42d3c7e7bac113b5447ddd1488369c 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); mnodeGetMnodeEpSetForShell(epSet); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); + mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, + taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { - mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); + if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { + epSet->inUse = (i + 1) % epSet->numOfEps; + mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + } else { + mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + } } + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); + return TSDB_CODE_RPC_REDIRECT; } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 7654536122873ce3fbc37a300a87f767bd33d415..895aa0400a14d0e35030cf04b36e00cff2219f2c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -91,6 +91,7 @@ typedef struct { } SSdbWriteWorkerPool; extern void * tsMnodeTmr; +static void * tsUpdateSyncTmr; static SSdbObject tsSdbObj = {0}; static taos_qset tsSdbWriteQset; static taos_qall tsSdbWriteQall; @@ -297,27 +298,25 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { taosFreeQitem(pOper); } -void sdbUpdateSync() { +static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); } + +void sdbUpdateAsync() { + taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsUpdateSyncTmr); +} + +void sdbUpdateSync(void *pMnodes) { + SDMMnodeInfos *mnodes = pMnodes; if (!mnodeIsRunning()) { - mDebug("mnode not start yet, update sync info later"); + mDebug("mnode not start yet, update sync config later"); return; } - mDebug("update sync info in sdb"); + mDebug("update sync config in sync module, mnodes:%p", pMnodes); SSyncCfg syncCfg = {0}; int32_t index = 0; - SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); - for (int32_t i = 0; i < mnodes->nodeNum; ++i) { - SDMMnodeInfo *node = &mnodes->nodeInfos[i]; - syncCfg.nodeInfo[i].nodeId = node->nodeId; - taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort); - syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; - index++; - } - - if (index == 0) { + if (mnodes == NULL) { void *pIter = NULL; while (1) { SMnodeObj *pMnode = NULL; @@ -337,9 +336,19 @@ void sdbUpdateSync() { mnodeDecMnodeRef(pMnode); } sdbFreeIter(pIter); + syncCfg.replica = index; + mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); + } else { + for (index = 0; index < mnodes->nodeNum; ++index) { + SDMMnodeInfo *node = &mnodes->nodeInfos[index]; + syncCfg.nodeInfo[index].nodeId = node->nodeId; + taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); + syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; + } + syncCfg.replica = index; + mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica); } - syncCfg.replica = index; syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; bool hasThisDnode = false; @@ -350,8 +359,15 @@ void sdbUpdateSync() { } } - if (!hasThisDnode) return; - if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return; + if (!hasThisDnode) { + sdbDebug("update sync config, this dnode not exist"); + return; + } + + if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) { + sdbDebug("update sync config, info not changed"); + return; + } sdbInfo("work as mnode, replica:%d", syncCfg.replica); for (int32_t i = 0; i < syncCfg.replica; ++i) { @@ -1038,7 +1054,7 @@ static void *sdbWorkerFp(void *param) { while (1) { numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed); if (numOfMsgs == 0) { - sdbDebug("sdbWorkerFp: got no message from qset, exiting..."); + sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset); break; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 3e7a946e20cdd5ea95aeb62ea4e59962af3d09ad..283132697d393d586b561d41e43d58160c0447d1 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -310,7 +310,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; if (pVgid->pDnode == pDnode) { - mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pDnode->dnodeId, pVgroup->vgId, pVgid->role); + mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d:%s", pDnode->dnodeId, pVgroup->vgId, pVgid->role, syncRole[pVgid->role]); pVgid->role = pVload->role; if (pVload->role == TAOS_SYNC_ROLE_MASTER) { pVgroup->inUse = i; diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index ab3cfa2dad179628a9ee74d578ff0d4d06ead127..d021745d2b754c21e422f87438d18539a15fd908 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], - epSet->inUse); + mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg, + taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { - mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], - htons(epSet->port[i])); + if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { + epSet->inUse = (i + 1) % epSet->numOfEps; + mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], + htons(epSet->port[i]), epSet->inUse); + } else { + mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], + htons(epSet->port[i])); + } } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 86a97a6abe251e1fa99219b9fb63a525ba89f7d4..43a8ddbd1a0d207addf0576951b2af9b2be16555 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) { while (1) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { - httpDebug("httpResultQueue: got no message from qset, exiting..."); + httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); break; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 414d37d8b8c3141784a22bb117a8ec6d92b4096a..ad247ad25b59dfa1618ff9ff03e4c502e0cd3e3d 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -542,10 +542,7 @@ void rpcCancelRequest(void *handle) { if (pContext->pConn) { tDebug("%s, app tries to cancel request", pContext->pConn->info); - pContext->pConn->pReqMsg = NULL; rpcCloseConn(pContext->pConn); - pContext->pConn = NULL; - rpcFreeCont(pContext->pCont); } } @@ -613,8 +610,10 @@ static void rpcReleaseConn(SRpcConn *pConn) { if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg } else { // if there is an outgoing message, free it - if (pConn->outType && pConn->pReqMsg) + if (pConn->outType && pConn->pReqMsg) { + if (pConn->pContext) pConn->pContext->pConn = NULL; rpcFreeMsg(pConn->pReqMsg); + } } // memset could not be used, since lockeBy can not be reset @@ -1121,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; if (pEpSet->numOfEps > 0) { memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); - tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps); - for (int i=0; iepSet.numOfEps; ++i) + tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, + pContext->epSet.inUse); + for (int i = 0; i < pContext->epSet.numOfEps; ++i) { pContext->epSet.port[i] = htons(pContext->epSet.port[i]); + tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i], + pContext->epSet.port[i]); + } } rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index dd9e7684e03e5cfc8c44dc3555a4ad1d144b90b6..0b9bbae92eef3942b7a930f30650c412ce2eba46 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -525,7 +525,7 @@ static void *taosProcessTcpData(void *param) { while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); + taosReportBrokenLink(pFdObj); } pthread_mutex_destroy(&(pThreadObj->mutex)); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 0daf0b962060e9c9fc40d101d13eab200262c40a..4d5e19af77f203beed8d63e8932023e0d415971e 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -215,6 +215,9 @@ void syncStop(void *param) { pthread_mutex_lock(&(pNode->mutex)); + if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); + if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); + for (int i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; if (pPeer) syncRemovePeer(pPeer); @@ -223,9 +226,6 @@ void syncStop(void *param) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); - if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); - if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); - pthread_mutex_unlock(&(pNode->mutex)); syncDecNodeRef(pNode); @@ -313,6 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { // always update version nodeVersion = pWalHead->version; + sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype); + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; // only pkt from RPC or CQ can be forwarded @@ -1189,6 +1191,8 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; SSyncFwds *pSyncFwds = pNode->pSyncFwds; + if (pSyncFwds == NULL) return; + uint64_t time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 8c6d6243eb4f677e9a5436b4449e07df011d2b52..d3c17e7626c4566df32c1e4257bda1a4e68a200a 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) { // thread to exit. void taosQsetThreadResume(taos_qset param) { STaosQset *qset = (STaosQset *)param; + uDebug("qset:%p, it will exit", qset); tsem_post(&qset->sem); } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 8fccb1442f9d9af20e8153be7790ad592fbe010f..e26778e86b9876ab6e6f47f4b41207ea35cafbf5 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG -echo "mDebugFlag 135" >> $TAOS_CFG -echo "sdbDebugFlag 135" >> $TAOS_CFG -echo "dDebugFlag 135" >> $TAOS_CFG -echo "vDebugFlag 135" >> $TAOS_CFG -echo "tsdbDebugFlag 135" >> $TAOS_CFG -echo "cDebugFlag 135" >> $TAOS_CFG -echo "jnidebugFlag 135" >> $TAOS_CFG -echo "odbcdebugFlag 135" >> $TAOS_CFG -echo "httpDebugFlag 135" >> $TAOS_CFG -echo "monitorDebugFlag 135" >> $TAOS_CFG -echo "mqttDebugFlag 135" >> $TAOS_CFG -echo "qdebugFlag 135" >> $TAOS_CFG -echo "rpcDebugFlag 135" >> $TAOS_CFG +echo "mDebugFlag 143" >> $TAOS_CFG +echo "sdbDebugFlag 143" >> $TAOS_CFG +echo "dDebugFlag 143" >> $TAOS_CFG +echo "vDebugFlag 143" >> $TAOS_CFG +echo "tsdbDebugFlag 143" >> $TAOS_CFG +echo "cDebugFlag 143" >> $TAOS_CFG +echo "jnidebugFlag 143" >> $TAOS_CFG +echo "odbcdebugFlag 143" >> $TAOS_CFG +echo "httpDebugFlag 143" >> $TAOS_CFG +echo "monitorDebugFlag 143" >> $TAOS_CFG +echo "mqttDebugFlag 143" >> $TAOS_CFG +echo "qdebugFlag 143" >> $TAOS_CFG +echo "rpcDebugFlag 143" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG -echo "udebugFlag 135" >> $TAOS_CFG -echo "sdebugFlag 135" >> $TAOS_CFG -echo "wdebugFlag 135" >> $TAOS_CFG -echo "cqdebugFlag 135" >> $TAOS_CFG +echo "udebugFlag 143" >> $TAOS_CFG +echo "sdebugFlag 143" >> $TAOS_CFG +echo "wdebugFlag 143" >> $TAOS_CFG +echo "cqdebugFlag 143" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG diff --git a/tests/script/test.sh b/tests/script/test.sh index 96e4ffe689e5098b36ca9993eed66fd7ba9cacef..a68ac4736dc8e7c0fa3b458c722ea58e4c9aec8e 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG echo "numOfLogLines 100000000" >> $TAOS_CFG -echo "dDebugFlag 135" >> $TAOS_CFG -echo "mDebugFlag 135" >> $TAOS_CFG -echo "sdbDebugFlag 135" >> $TAOS_CFG -echo "rpcDebugFlag 135" >> $TAOS_CFG +echo "rpcDebugFlag 143" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG -echo "cDebugFlag 135" >> $TAOS_CFG -echo "httpDebugFlag 135" >> $TAOS_CFG -echo "monitorDebugFlag 135" >> $TAOS_CFG -echo "udebugFlag 135" >> $TAOS_CFG +echo "cDebugFlag 143" >> $TAOS_CFG +echo "udebugFlag 143" >> $TAOS_CFG echo "tablemetakeeptimer 5" >> $TAOS_CFG echo "wal 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG diff --git a/tests/script/unique/cluster/vgroup100.sim b/tests/script/unique/cluster/vgroup100.sim index bde6dd2462db261c3f50088fec3419543bdcb07d..7879c5529efa242cabcc37ce93791bc50e7197e5 100644 --- a/tests/script/unique/cluster/vgroup100.sim +++ b/tests/script/unique/cluster/vgroup100.sim @@ -27,7 +27,16 @@ system sh/exec.sh -n dnode2 -s start sql create dnode $hostname3 system sh/exec.sh -n dnode3 -s start -sleep 5000 +sleep 3000 + +$x = 0 +show2: + $x = $x + 1 + sleep 2000 + if $x == 10 then + return -1 + endi + sql show mnodes $dnode1Role = $data2_1 $dnode2Role = $data2_2 @@ -37,6 +46,16 @@ print $dnode1Role print $dnode2Role print $dnode3Role +if $dnode1Role != master then + goto show2 +endi +if $dnode2Role != slave then + goto show2 +endi +if $dnode3Role != slave then + goto show2 +endi + print ============================== step3 $count = 2 while $count < 102 diff --git a/tests/script/unique/mnode/mgmt21.sim b/tests/script/unique/mnode/mgmt21.sim index 53ad0eebe7cc1798647226452479db47d1476528..8409383309dbde5500b9719cd64fd74ca5e384b2 100644 --- a/tests/script/unique/mnode/mgmt21.sim +++ b/tests/script/unique/mnode/mgmt21.sim @@ -26,11 +26,11 @@ $x = 0 show2: $x = $x + 1 sleep 2000 - if $x == 10 then + if $x == 5 then return -1 endi -sql show mnodes +sql show mnodes -x show2 print dnode1 ==> $data2_1 print dnode2 ==> $data2_2 if $data2_1 != master then