From c95b1eeccbbc6ccb12c38e68e80803cea4a73d01 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 22 Apr 2020 21:22:01 +0800 Subject: [PATCH] fix bug while balance --- src/dnode/src/dnodeMgmt.c | 33 +++---------- src/dnode/src/dnodeMnode.c | 1 - src/inc/taosmsg.h | 15 +++--- src/inc/vnode.h | 1 + src/mnode/inc/mgmtVgroup.h | 1 + src/mnode/src/mgmtDnode.c | 7 +-- src/mnode/src/mgmtVgroup.c | 43 ++++++++++++++-- src/util/inc/tutil.h | 2 + src/util/src/tutil.c | 24 +++++++++ src/vnode/src/vnodeMain.c | 49 ++++++++++++++++--- tests/script/sh/deploy.sh | 2 +- tests/script/unique/dnode/balance1.sim | 18 +++---- .../unique/{dnodes => dnode}/basic1.sim | 0 13 files changed, 137 insertions(+), 59 deletions(-) rename tests/script/unique/{dnodes => dnode}/basic1.sim (100%) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 99209c734c..97768ca743 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -33,7 +33,6 @@ static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); -static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); @@ -41,7 +40,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; @@ -146,7 +144,14 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp); } - return vnodeCreate(pCreate); + void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId); + if (pVnode != NULL) { + int32_t code = vnodeAlter(pVnode, pCreate); + vnodeRelease(pVnode); + return code; + } else { + return vnodeCreate(pCreate); + } } static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { @@ -156,28 +161,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { return vnodeDrop(pDrop->vgId); } -static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { - SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; - pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); - pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); - pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize); - pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock); - pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock); - pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); - pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); - pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); - pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); - pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp); - - for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { - pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); - pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp); - } - - return 0; -} - static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { // SMDAlterStreamMsg *pStream = pCont; // pStream->uid = htobe64(pStream->uid); diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index 0c16e0ca84..9672a34a9f 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -33,7 +33,6 @@ int32_t dnodeInitMnode() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c3d745c7ac..d821f3117b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -48,14 +48,12 @@ extern "C" { #define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16 #define TSDB_MSG_TYPE_MD_DROP_VNODE 17 #define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 -#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 -#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 -#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 -#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24 -#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 -#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26 +#define TSDB_MSG_TYPE_MD_DROP_STABLE 19 +#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 20 +#define TSDB_MSG_TYPE_MD_ALTER_STREAM 21 +#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 22 +#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 23 +#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 24 // message from client to mnode #define TSDB_MSG_TYPE_CM_CONNECT 31 @@ -512,6 +510,7 @@ typedef struct { uint8_t status; uint8_t role; uint8_t accessState; + uint8_t replica; uint8_t reserved[5]; } SVnodeLoad; diff --git a/src/inc/vnode.h b/src/inc/vnode.h index e8a7a1458f..1714f1336a 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -38,6 +38,7 @@ typedef struct { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 534e640c4d..058ed06f84 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -47,6 +47,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index bf58adf594..ef3f93ddad 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -335,6 +335,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pStatus->dnodeId == 0) { mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); + } else { + mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); } int32_t openVnodes = htons(pStatus->openVnodes); @@ -349,11 +351,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); } else { mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload); - if (pVload->role == TAOS_SYNC_ROLE_MASTER) { - pVgroup->totalStorage = htobe64(pVload->totalStorage); - pVgroup->compStorage = htobe64(pVload->compStorage); - pVgroup->pointsWritten = htobe64(pVload->pointsWritten); - } mgmtDecVgroupRef(pVgroup); } } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 3bccf385f1..4088b37e8a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -44,9 +44,7 @@ static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, vo static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ; - -static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); -static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; @@ -124,9 +122,25 @@ static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) { static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { SVgObj *pNew = pOper->pObj; SVgObj *pVgroup = mgmtGetVgroup(pNew->vgId); + if (pVgroup != pNew) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SDnodeObj *pDnode = pVgroup->vnodeGid[i].pDnode; + if (pDnode != NULL) { + atomic_sub_fetch_32(&pDnode->openVnodes, 1); + } + } + memcpy(pVgroup, pNew, pOper->rowSize); free(pNew); + + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); + pVgroup->vnodeGid[i].pDnode = pDnode; + if (pDnode != NULL) { + atomic_add_fetch_32(&pDnode->openVnodes, 1); + } + } } int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); @@ -232,6 +246,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) { } void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) { + bool dnodeExist = false; for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; if (pVgid->pDnode == pDnode) { @@ -239,9 +254,29 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo if (pVload->role == TAOS_SYNC_ROLE_MASTER) { pVgroup->inUse = i; } + dnodeExist = true; break; } } + + if (!dnodeExist) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); + mError("vgroup:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); + mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); + return; + } + + if (pVload->role == TAOS_SYNC_ROLE_MASTER) { + pVgroup->totalStorage = htobe64(pVload->totalStorage); + pVgroup->compStorage = htobe64(pVload->compStorage); + pVgroup->pointsWritten = htobe64(pVload->pointsWritten); + } + + if (pVload->replica != pVgroup->numOfVnodes) { + mError("dnode:%d, vgroup:%d replica:%d not match with mgmt:%d", pDnode->dnodeId, pVload->vgId, pVload->replica, + pVgroup->numOfVnodes); + mgmtSendCreateVgroupMsg(pVgroup, NULL); + } } SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { @@ -521,7 +556,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SMDVnodeDesc *pNodes = pVnode->nodes; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - SDnodeObj *pDnode = pVgroup->vnodeGid[0].pDnode; + SDnodeObj *pDnode = pVgroup->vnodeGid[j].pDnode; if (pDnode != NULL) { pNodes[j].nodeId = htonl(pDnode->dnodeId); pNodes[j].nodeIp = htonl(pDnode->privateIp); diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index ed58c2e60d..cdcc639151 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -170,6 +170,8 @@ char *taosIpStr(uint32_t ipInt); uint32_t ip2uint(const char *const ip_addr); +void taosRemoveDir(char *rootDir); + #define TAOS_ALLOC_MODE_DEFAULT 0 #define TAOS_ALLOC_MODE_RANDOM_FAIL 1 #define TAOS_ALLOC_MODE_DETECT_LEAK 2 diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 3d91020365..47d66a066e 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -662,4 +662,28 @@ void tzfree(void *ptr) { if (ptr) { free((void *)((char *)ptr - sizeof(size_t))); } +} + +void taosRemoveDir(char *rootDir) { + DIR *dir = opendir(rootDir); + if (dir == NULL) return; + + struct dirent *de = NULL; + while ((de = readdir(dir)) != NULL) { + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue; + + char filename[1024]; + snprintf(filename, 1023, "%s/%s", rootDir, de->d_name); + if (de->d_type & DT_DIR) { + taosRemoveDir(filename); + } else { + remove(filename); + uPrint("file:%s is removed", filename); + } + } + + closedir(dir); + rmdir(rootDir); + + uPrint("dir:%s is removed", rootDir); } \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 5fd337ceca..5c5b6ff272 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -135,6 +135,39 @@ int32_t vnodeDrop(int32_t vgId) { return TSDB_CODE_SUCCESS; } +int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { + SVnodeObj *pVnode = param; + int32_t code = vnodeSaveCfg(pVnodeCfg); + if (code != TSDB_CODE_SUCCESS) { + dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); + return code; + } + + code = vnodeReadCfg(pVnode); + if (code != TSDB_CODE_SUCCESS) { + dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return code; + } + + code = syncReconfig(pVnode->sync, &pVnode->syncCfg); + if (code != TSDB_CODE_SUCCESS) { + dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode, pVnode->vgId, + tstrerror(code)); + return code; + } + + code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); + if (code != TSDB_CODE_SUCCESS) { + dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode, pVnode->vgId, + tstrerror(code)); + return code; + } + + dTrace("pVnode:%p vgId:%d, vnode is altered", pVnode, pVnode->vgId); + return TSDB_CODE_SUCCESS; +} + int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; pthread_once(&vnodeModuleInit, vnodeInit); @@ -159,7 +192,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, &pVnode->walCfg); + pVnode->wal = walOpen(temp, &pVnode->walCfg); SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; @@ -172,10 +205,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; - pVnode->sync = syncStart(&syncInfo); + pVnode->sync = syncStart(&syncInfo); - pVnode->events = NULL; - pVnode->cq = NULL; + pVnode->events = NULL; + pVnode->cq = NULL; STsdbAppH appH = {0}; appH.appH = (void *)pVnode; @@ -233,7 +266,9 @@ void vnodeRelease(void *pVnodeRaw) { pVnode->wqueue = NULL; if (pVnode->status == TAOS_VN_STATUS_DELETING) { - // remove the whole directory + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); + taosRemoveDir(rootDir); } free(pVnode); @@ -252,7 +287,8 @@ void *vnodeGetVnode(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_INVALID_VGROUP_ID; - assert(false); + dError("vgId:%d not exist"); + return NULL; } return *ppVnode; @@ -298,6 +334,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { pLoad->vgId = htonl(pVnode->vgId); pLoad->status = pVnode->status; pLoad->role = pVnode->role; + pLoad->replica = pVnode->syncCfg.replica; } static void vnodeCleanUp(SVnodeObj *pVnode) { diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index b1aa7c6382..41ba3c425a 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG echo "dDebugFlag 199" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG echo "sdbDebugFlag 199" >> $TAOS_CFG -echo "rpcDebugFlag 131" >> $TAOS_CFG +echo "rpcDebugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 131" >> $TAOS_CFG diff --git a/tests/script/unique/dnode/balance1.sim b/tests/script/unique/dnode/balance1.sim index 413d4d74b3..555ca3cd6b 100644 --- a/tests/script/unique/dnode/balance1.sim +++ b/tests/script/unique/dnode/balance1.sim @@ -25,15 +25,15 @@ system sh/cfg.sh -n dnode2 -c mgmtEqualVnodeNum -v 4 system sh/cfg.sh -n dnode3 -c mgmtEqualVnodeNum -v 4 system sh/cfg.sh -n dnode4 -c mgmtEqualVnodeNum -v 4 -system sh/cfg.sh -n dnode1 -c clog -v 1 -system sh/cfg.sh -n dnode2 -c clog -v 1 -system sh/cfg.sh -n dnode3 -c clog -v 1 -system sh/cfg.sh -n dnode4 -c clog -v 1 - -system sh/cfg.sh -n dnode1 -c clog -v 1 -system sh/cfg.sh -n dnode2 -c clog -v 1 -system sh/cfg.sh -n dnode3 -c clog -v 1 -system sh/cfg.sh -n dnode4 -c clog -v 1 +system sh/cfg.sh -n dnode1 -c clog -v 2 +system sh/cfg.sh -n dnode2 -c clog -v 2 +system sh/cfg.sh -n dnode3 -c clog -v 2 +system sh/cfg.sh -n dnode4 -c clog -v 2 + +system sh/cfg.sh -n dnode1 -c clog -v 2 +system sh/cfg.sh -n dnode2 -c clog -v 2 +system sh/cfg.sh -n dnode3 -c clog -v 2 +system sh/cfg.sh -n dnode4 -c clog -v 2 print ========== step1 system sh/exec_up.sh -n dnode1 -s start diff --git a/tests/script/unique/dnodes/basic1.sim b/tests/script/unique/dnode/basic1.sim similarity index 100% rename from tests/script/unique/dnodes/basic1.sim rename to tests/script/unique/dnode/basic1.sim -- GitLab