From 78fb1911d48bab09d76c9fd4d7de564622b104ad Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 14 Oct 2022 17:59:09 +0800 Subject: [PATCH] enh: refactor the code to create and delete mnodes --- include/dnode/mnode/mnode.h | 5 +- include/libs/sync/sync.h | 4 +- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 4 +- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 112 ++++-- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 28 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 42 ++- source/dnode/mnode/impl/inc/mndInt.h | 14 +- source/dnode/mnode/impl/inc/mndSync.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 3 +- source/dnode/mnode/impl/src/mndMain.c | 89 ++--- source/dnode/mnode/impl/src/mndMnode.c | 360 ++++++++++---------- source/dnode/mnode/impl/src/mndSync.c | 31 +- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/sdb/inc/sdb.h | 3 +- source/dnode/mnode/sdb/src/sdbHash.c | 22 +- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncMain.c | 21 +- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/mnode/basic1.sim | 6 +- tests/script/tsim/mnode/basic4.sim | 50 +-- tests/script/tsim/mnode/basic5.sim | 5 +- tests/system-test/fulltest.sh | 6 +- 23 files changed, 425 insertions(+), 390 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 0d43539629..cdb1642a5c 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -30,9 +30,10 @@ typedef struct SMnode SMnode; typedef struct { int32_t dnodeId; - bool standby; bool deploy; - SReplica replica; + int8_t selfIndex; + int8_t numOfReplicas; + SReplica replicas[TSDB_MAX_REPLICA]; SMsgCb msgCb; } SMnodeOpt; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 411a63a379..0f8220f19c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -211,7 +211,7 @@ typedef struct SSyncInfo { int32_t syncInit(); void syncCleanUp(); -int64_t syncOpen(const SSyncInfo* pSyncInfo); +int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); @@ -233,7 +233,7 @@ const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); +int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg); // build SRpcMsg, need to call syncPropose with SRpcMsg int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index a4c37dd334..17cbde437b 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -41,8 +41,8 @@ typedef struct SMnodeMgmt { } SMnodeMgmt; // mmFile.c -int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed); +int32_t mmReadFile(const char *path, SMnodeOpt *pOption); +int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption); // mmHandle.c SArray *mmGetMsgHandles(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 27a35ae17a..c5ddb9f021 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { +int32_t mmReadFile(const char *path, SMnodeOpt *pOption) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 4096; @@ -25,7 +25,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { char file[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { code = 0; @@ -50,38 +50,69 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { dError("failed to read %s since deployed not found", file); goto _OVER; } - *pDeployed = deployed->valueint; + pOption->deploy = deployed->valueint; - cJSON *id = cJSON_GetObjectItem(root, "id"); - if (id) { - if (id->type != cJSON_Number) { - dError("failed to read %s since id not found", file); + cJSON *selfIndex = cJSON_GetObjectItem(root, "selfIndex"); + if (selfIndex) { + if (selfIndex->type != cJSON_Number) { + dError("failed to read %s since selfIndex not found", file); goto _OVER; } - if (pReplica) { - pReplica->id = id->valueint; - } + pOption->selfIndex = selfIndex->valueint; } - cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn"); - if (fqdn) { - if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", file); + cJSON *replicas = cJSON_GetObjectItem(root, "replicas"); + if (replicas) { + if (replicas->type != cJSON_Array) { + dError("failed to read %s since replicas not found", file); goto _OVER; } - if (pReplica) { - tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); - } - } - cJSON *port = cJSON_GetObjectItem(root, "port"); - if (port) { - if (port->type != cJSON_Number) { - dError("failed to read %s since port not found", file); + int32_t numOfReplicas = cJSON_GetArraySize(replicas); + if (numOfReplicas <= 0) { + dError("failed to read %s since numOfReplicas:%d invalid", file, numOfReplicas); goto _OVER; } - if (pReplica) { - pReplica->port = (uint16_t)port->valueint; + pOption->numOfReplicas = numOfReplicas; + + for (int32_t i = 0; i < numOfReplicas; ++i) { + SReplica *pReplica = pOption->replicas + i; + + cJSON *replica = cJSON_GetArrayItem(replicas, i); + if (replica == NULL) break; + + cJSON *id = cJSON_GetObjectItem(replica, "id"); + if (id) { + if (id->type != cJSON_Number) { + dError("failed to read %s since id not found", file); + goto _OVER; + } + if (pReplica) { + pReplica->id = id->valueint; + } + } + + cJSON *fqdn = cJSON_GetObjectItem(replica, "fqdn"); + if (fqdn) { + if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", file); + goto _OVER; + } + if (pReplica) { + tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + } + } + + cJSON *port = cJSON_GetObjectItem(replica, "port"); + if (port) { + if (port->type != cJSON_Number) { + dError("failed to read %s since port not found", file); + goto _OVER; + } + if (pReplica) { + pReplica->port = (uint16_t)port->valueint; + } + } } } @@ -92,18 +123,18 @@ _OVER: if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); if (code == 0) { - dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); + dDebug("succcessed to read file %s, deployed:%d", file, pOption->deploy); } terrno = code; return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) { +int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -117,12 +148,25 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - if (pReplica != NULL && pReplica->id > 0) { - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port); + if (pOption->deploy && pOption->numOfReplicas > 0) { + len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pOption->selfIndex); + len += snprintf(content + len, maxLen - len, " \"replicas\": [{\n"); + + for (int32_t i = 0; i < pOption->numOfReplicas; ++i) { + const SReplica *pReplica = pOption->replicas + i; + if (pReplica != NULL && pReplica->id > 0) { + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); + } + if (i < pOption->numOfReplicas - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }],\n"); + } + } } - len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", pOption->deploy); len += snprintf(content + len, maxLen - len, "}\n"); taosWriteFile(pFile, content, len); @@ -136,6 +180,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) return -1; } - dDebug("successed to write %s, deployed:%d", realfile, deployed); + dDebug("successed to write %s, deployed:%d", realfile, pOption->deploy); return 0; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ec761e6441..fc45dbf15f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -80,18 +80,21 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { return -1; } - if (createReq.replica != 1) { + SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica, .selfIndex = -1}; + memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas)); + for (int32_t i = 0; i < option.numOfReplicas; ++i) { + if (createReq.replicas[i].id == pInput->pData->dnodeId) { + option.selfIndex = i; + } + } + + if (option.selfIndex == -1) { terrno = TSDB_CODE_INVALID_OPTION; - dGError("failed to create mnode since %s", terrstr()); + dGError("failed to create mnode since %s, selfIndex is -1", terrstr()); return -1; } - bool deployed = true; - - SMnodeMgmt mgmt = {0}; - mgmt.path = pInput->path; - mgmt.name = pInput->name; - if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) { + if (mmWriteFile(pInput->path, &option) != 0) { dGError("failed to write mnode file since %s", terrstr()); return -1; } @@ -113,12 +116,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { return -1; } - bool deployed = false; - - SMnodeMgmt mgmt = {0}; - mgmt.path = pInput->path; - mgmt.name = pInput->name; - if (mmWriteFile(&mgmt, NULL, deployed) != 0) { + SMnodeOpt option = {.deploy = false}; + if (mmWriteFile(pInput->path, &option) != 0) { dGError("failed to write mnode file since %s", terrstr()); return -1; } @@ -207,7 +206,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - // if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 49207225a5..e7f71ad420 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -25,38 +25,35 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) { } static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { - SMnodeMgmt mgmt = {0}; - mgmt.path = pInput->path; - if (mmReadFile(&mgmt, NULL, required) != 0) { + SMnodeOpt option = {0}; + if (mmReadFile(pInput->path, &option) != 0) { return -1; } - if (!(*required)) { + if (!option.deploy) { *required = mmDeployRequired(pInput); + } else { + *required = true; } return 0; } static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { - pOption->standby = false; pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - pOption->replica.id = 1; - pOption->replica.port = tsServerPort; - tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN); + pOption->selfIndex = 0; + pOption->numOfReplicas = 1; + pOption->replicas[0].id = 1; + pOption->replicas[0].port = tsServerPort; + tstrncpy(pOption->replicas[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); } -static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) { - pOption->standby = false; +static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->deploy = false; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - if (pReplica->id > 0) { - pOption->standby = true; - pOption->replica = *pReplica; - } } static void mmClose(SMnodeMgmt *pMgmt) { @@ -95,22 +92,20 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.mgmt = pMgmt; taosThreadRwlockInit(&pMgmt->lock, NULL); - bool deployed = false; - SReplica replica = {0}; - if (mmReadFile(pMgmt, &replica, &deployed) != 0) { + SMnodeOpt option = {0}; + if (mmReadFile(pMgmt->path, &option) != 0) { dError("failed to read file since %s", terrstr()); mmClose(pMgmt); return -1; } - SMnodeOpt option = {0}; - if (!deployed) { + if (!option.deploy) { dInfo("mnode start to deploy"); pMgmt->pData->dnodeId = 1; mmBuildOptionForDeploy(pMgmt, pInput, &option); } else { dInfo("mnode start to open"); - mmBuildOptionForOpen(pMgmt, &replica, &option); + mmBuildOptionForOpen(pMgmt, &option); } pMgmt->pMnode = mndOpen(pMgmt->path, &option); @@ -128,9 +123,10 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("mnode-worker", "initialized"); - if (!deployed || replica.id > 0) { - deployed = true; - if (mmWriteFile(pMgmt, NULL, deployed) != 0) { + if (option.numOfReplicas > 0) { + option.deploy = true; + option.numOfReplicas = 0; + if (mmWriteFile(pMgmt->path, &option) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 71315a39fd..f55e830a44 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -87,12 +87,13 @@ typedef struct { typedef struct { tsem_t syncSem; int64_t sync; - SReplica replica; int32_t errCode; int32_t transId; SRWLatch lock; - int8_t standby; int8_t leaderTransferFinish; + int8_t selfIndex; + int8_t numOfReplicas; + SReplica replicas[TSDB_MAX_REPLICA]; } SSyncMgmt; typedef struct { @@ -130,11 +131,10 @@ typedef struct SMnode { void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); int64_t mndGenerateUid(const char *name, int32_t len); -int32_t mndAcquireRpcRef(SMnode *pMnode); -void mndReleaseRpcRef(SMnode *pMnode); -void mndSetRestore(SMnode *pMnode, bool restored); -void mndSetStop(SMnode *pMnode); -bool mndGetStop(SMnode *pMnode); +void mndSetRestored(SMnode *pMnode, bool restored); +bool mndGetRestored(SMnode *pMnode); +void mndSetStop(SMnode *pMnode); +bool mndGetStop(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index cb9d70d5ee..993efbcd08 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -24,7 +24,7 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); -bool mndIsMaster(SMnode *pMnode); +bool mndIsLeader(SMnode *pMnode); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId); void mndSyncStart(SMnode *pMnode); void mndSyncStop(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6670211d78..3b31873857 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1809,7 +1809,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc } while (numOfRows < rowsCapacity) { - pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus); + pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true); if (pShow->pIter == NULL) break; if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) == 0) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 9ade39e8e7..fba1fd94d6 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -423,7 +423,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { goto _OVER; } else { pDnode->accessTimes++; - mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + mDebug("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); } } @@ -471,6 +471,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } pDnode->lastAccessTime = curMs; + pDnode->accessTimes++; code = 0; _OVER: diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 275fc76e36..a52ca1bd42 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -43,6 +43,37 @@ #include "mndUser.h" #include "mndVgroup.h" +static inline int32_t mndAcquireRpc(SMnode *pMnode) { + int32_t code = 0; + taosThreadRwlockRdlock(&pMnode->lock); + if (pMnode->stopped) { + terrno = TSDB_CODE_APP_NOT_READY; + code = -1; + } else if (!mndIsLeader(pMnode)) { + code = -1; + } else { +#if 1 + atomic_add_fetch_32(&pMnode->rpcRef, 1); +#else + int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); + mTrace("mnode rpc is acquired, ref:%d", ref); +#endif + } + taosThreadRwlockUnlock(&pMnode->lock); + return code; +} + +static inline void mndReleaseRpc(SMnode *pMnode) { + taosThreadRwlockRdlock(&pMnode->lock); +#if 1 + atomic_sub_fetch_32(&pMnode->rpcRef, 1); +#else + int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); + mTrace("mnode rpc is released, ref:%d", ref); +#endif + taosThreadRwlockUnlock(&pMnode->lock); +} + static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -338,8 +369,9 @@ static int32_t mndExecSteps(SMnode *pMnode) { static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->msgCb = pOption->msgCb; pMnode->selfDnodeId = pOption->dnodeId; - pMnode->syncMgmt.replica = pOption->replica; - pMnode->syncMgmt.standby = pOption->standby; + pMnode->syncMgmt.selfIndex = pOption->selfIndex; + pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas; + memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas)); } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { @@ -430,7 +462,7 @@ int32_t mndStart(SMnode *pMnode) { mError("failed to deploy sdb while start mnode"); return -1; } - mndSetRestore(pMnode, true); + mndSetRestored(pMnode, true); } grantReset(pMnode, TSDB_GRANT_ALL, 0); @@ -570,23 +602,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { return 0; } - if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; + if (mndAcquireRpc(pMsg->info.node) == 0) return 0; if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { return -1; } - SEpSet epSet = {0}; - mndGetMnodeEpSet(pMsg->info.node, &epSet); + SEpSet epSet = {0}; + SMnode *pMnode = pMsg->info.node; + mndGetMnodeEpSet(pMnode, &epSet); const STraceId *trace = &pMsg->info.traceId; - mError("msg:%p, failed to check mnode state since %s, type:%s, numOfMnodes:%d inUse:%d", pMsg, terrstr(), - TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse); + mDebug( + "msg:%p, failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s " + "numOfEps:%d inUse:%d", + pMsg, terrstr(), pMnode->restored, pMnode->stopped, syncIsRestoreFinish(pMnode->syncMgmt.sync), + syncGetMyRoleStr(pMnode->syncMgmt.sync), TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse); if (epSet.numOfEps > 0) { for (int32_t i = 0; i < epSet.numOfEps; ++i) { - mInfo("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); } int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); @@ -633,7 +669,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); int32_t code = (*fp)(pMsg); - mndReleaseRpcRef(pMnode); + mndReleaseRpc(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mGTrace("msg:%p, won't response immediately since in progress", pMsg); @@ -669,7 +705,7 @@ int64_t mndGenerateUid(const char *name, int32_t len) { int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { - if (mndAcquireRpcRef(pMnode) != 0) return -1; + if (mndAcquireRpc(pMnode) != 0) return -1; SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); @@ -680,7 +716,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc)); if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL || pStbInfo->stbs == NULL) { - mndReleaseRpcRef(pMnode); + mndReleaseRpc(pMnode); return -1; } @@ -800,7 +836,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_total = INT32_MAX; } - mndReleaseRpcRef(pMnode); + mndReleaseRpc(pMnode); return 0; } @@ -810,32 +846,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; } -int32_t mndAcquireRpcRef(SMnode *pMnode) { - int32_t code = 0; - taosThreadRwlockRdlock(&pMnode->lock); - if (pMnode->stopped) { - mTrace("mnode not running"); - terrno = TSDB_CODE_APP_NOT_READY; - code = -1; - } else if (!mndIsMaster(pMnode)) { - mTrace("mnode not ready, role:%s restored:%d", syncGetMyRoleStr(pMnode->syncMgmt.sync), pMnode->restored); - code = -1; - } else { - int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); - // mTrace("mnode rpc is acquired, ref:%d", ref); - } - taosThreadRwlockUnlock(&pMnode->lock); - return code; -} - -void mndReleaseRpcRef(SMnode *pMnode) { - taosThreadRwlockRdlock(&pMnode->lock); - int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); - // mTrace("mnode rpc is released, ref:%d", ref); - taosThreadRwlockUnlock(&pMnode->lock); -} - -void mndSetRestore(SMnode *pMnode, bool restored) { +void mndSetRestored(SMnode *pMnode, bool restored) { if (restored) { taosThreadRwlockWrlock(&pMnode->lock); pMnode->restored = true; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index d0440359ff..c27241317c 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -36,6 +36,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq); static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); +static void mndReloadSyncConfig(SMnode *pMnode); int32_t mndInitMnode(SMnode *pMnode) { SSdbTable table = { @@ -187,6 +188,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { } pObj->state = TAOS_SYNC_STATE_ERROR; + mndReloadSyncConfig(pSdb->pMnode); return 0; } @@ -203,6 +205,8 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) { mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; + mndReloadSyncConfig(pSdb->pMnode); + return 0; } @@ -233,7 +237,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pIter == NULL) break; if (pObj->id == pMnode->selfDnodeId) { - if (mndIsMaster(pMnode)) { + if (mndIsLeader(pMnode)) { pEpSet->inUse = pEpSet->numOfEps; } else { pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes; @@ -248,6 +252,10 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pEpSet->numOfEps == 0) { syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet); } + + if (pEpSet->inUse >= pEpSet->numOfEps) { + pEpSet->inUse = 0; + } } static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { @@ -274,13 +282,72 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod return 0; } +static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq); + + STransAction action = { + .epSet = *pCreateEpSet, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_CREATE_MNODE, + .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED, + }; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + +static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq); + + STransAction action = { + .epSet = *pAlterEpSet, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_MND_ALTER_MNODE, + .acceptableCode = 0, + }; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) { + int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, pDropReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSCreateDropMQSBNodeReq(pReq, contLen, pDropReq); + + STransAction action = { + .epSet = *pDroprEpSet, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_DROP_MNODE, + .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED, + }; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; int32_t numOfReplicas = 0; - SDAlterMnodeReq alterReq = {0}; SDCreateMnodeReq createReq = {0}; - SEpSet alterEpset = {0}; SEpSet createEpset = {0}; while (1) { @@ -288,75 +355,25 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; - alterReq.replicas[numOfReplicas].id = pMObj->id; - alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port; - memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - - alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port; - memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - if (pMObj->state == TAOS_SYNC_STATE_LEADER) { - alterEpset.inUse = numOfReplicas; - } + createReq.replicas[numOfReplicas].id = pMObj->id; + createReq.replicas[numOfReplicas].port = pMObj->pDnode->port; + memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); numOfReplicas++; sdbRelease(pSdb, pMObj); } - alterReq.replica = numOfReplicas + 1; - alterReq.replicas[numOfReplicas].id = pDnode->id; - alterReq.replicas[numOfReplicas].port = pDnode->port; - memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - - alterEpset.numOfEps = numOfReplicas + 1; - alterEpset.eps[numOfReplicas].port = pDnode->port; - memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - - createReq.replica = 1; - createReq.replicas[0].id = pDnode->id; - createReq.replicas[0].port = pDnode->port; - memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + createReq.replica = numOfReplicas + 1; + createReq.replicas[numOfReplicas].id = pDnode->id; + createReq.replicas[numOfReplicas].port = pDnode->port; + memcpy(createReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + createEpset.inUse = 0; createEpset.numOfEps = 1; createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - { - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); - - STransAction action = { - .epSet = createEpset, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_DND_CREATE_MNODE, - .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - - { - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - - STransAction action = { - .epSet = alterEpset, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_MND_ALTER_MNODE, - .acceptableCode = 0, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } + if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1; return 0; } @@ -374,9 +391,9 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, mndTransSetSerial(pTrans); mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndTransAppendNullLog(pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -459,107 +476,28 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO } static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - int32_t numOfReplicas = 0; - SDAlterMnodeReq alterReq = {0}; - SDDropMnodeReq dropReq = {0}; - SSetStandbyReq standbyReq = {0}; - SEpSet alterEpset = {0}; - SEpSet dropEpSet = {0}; - - while (1) { - SMnodeObj *pMObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); - if (pIter == NULL) break; - if (pMObj->id == pObj->id) { - sdbRelease(pSdb, pMObj); - continue; - } - - alterReq.replicas[numOfReplicas].id = pMObj->id; - alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port; - memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - - alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port; - memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - if (pMObj->state == TAOS_SYNC_STATE_LEADER) { - alterEpset.inUse = numOfReplicas; - } - - numOfReplicas++; - sdbRelease(pSdb, pMObj); - } - - alterReq.replica = numOfReplicas; - alterEpset.numOfEps = numOfReplicas; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; + SDDropMnodeReq dropReq = {0}; + SEpSet dropEpSet = {0}; dropReq.dnodeId = pDnode->id; dropEpSet.numOfEps = 1; dropEpSet.eps[0].port = pDnode->port; memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - standbyReq.dnodeId = pDnode->id; - standbyReq.standby = 1; - - { - int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq) + sizeof(SMsgHead); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq); - SMsgHead *pHead = pReq; - pHead->contLen = htonl(contLen); - pHead->vgId = htonl(MNODE_HANDLE); - - STransAction action = { - .epSet = dropEpSet, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_SYNC_SET_MNODE_STANDBY, - .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - - { - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - - STransAction action = { - .epSet = alterEpset, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_MND_ALTER_MNODE, - .acceptableCode = 0, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - - { - int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq); - - STransAction action = { - .epSet = dropEpSet, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_DND_DROP_MNODE, - .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } + int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE); + if (totalMnodes == 2) { + mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes); + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; + if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1; + } else if (totalMnodes == 3) { + mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes); + if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1; + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; + } else { + return -1; } return 0; @@ -567,7 +505,6 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { if (pObj == NULL) return 0; - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1; if (mndTransAppendNullLog(pTrans) != 0) return -1; @@ -657,7 +594,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB int64_t curMs = taosGetTimestampMs(); while (numOfRows < rows) { - pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus); + pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true); if (pShow->pIter == NULL) break; cols = 0; @@ -712,6 +649,9 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { } static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { +#if 1 + return 0; +#else SMnode *pMnode = pReq->info.node; SDAlterMnodeReq alterReq = {0}; @@ -720,41 +660,107 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { return -1; } + SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1}; + memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas)); + for (int32_t i = 0; i < option.numOfReplicas; ++i) { + if (alterReq.replicas[i].id == pMnode->selfDnodeId) { + option.selfIndex = i; + } + } + + if (option.selfIndex == -1) { + mInfo("alter mnode not processed since selfIndex is -1", terrstr()); + return 0; + } + + if (mndWriteFile(pMnode->path, &option) != 0) { + mError("failed to write mnode file since %s", terrstr()); + return -1; + } + SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1}; for (int32_t i = 0; i < alterReq.replica; ++i) { SNodeInfo *pNode = &cfg.nodeInfo[i]; tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn)); pNode->nodePort = alterReq.replicas[i].port; - if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i; + if (alterReq.replicas[i].id == pMnode->selfDnodeId) { + cfg.myIndex = i; + } } if (cfg.myIndex == -1) { mError("failed to alter mnode since myindex is -1"); return -1; } else { - mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex); + mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); for (int32_t i = 0; i < alterReq.replica; ++i) { SNodeInfo *pNode = &cfg.nodeInfo[i]; mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); } } - mInfo("trans:-1, sync reconfig will be proposed"); - - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->standby = 0; - int32_t code = syncReconfig(pMgmt->sync, &cfg); + int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); if (code != 0) { - mError("trans:-1, failed to propose sync reconfig since %s", terrstr()); - return code; + mError("failed to sync reconfig since %s", terrstr()); } else { - pMgmt->errCode = 0; - taosWLockLatch(&pMgmt->lock); - pMgmt->transId = -1; - taosWUnLockLatch(&pMgmt->lock); - tsem_wait(&pMgmt->syncSem); - mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode)); - terrno = pMgmt->errCode; - return pMgmt->errCode; + mInfo("alter mnode sync success"); + } + + return code; +#endif +} + +static void mndReloadSyncConfig(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SMnodeObj *pObj = NULL; + ESdbStatus objStatus = 0; + void *pIter = NULL; + bool hasUpdatingMnode = false; + SSyncCfg cfg = {.myIndex = -1}; + + while (1) { + pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false); + if (pIter == NULL) break; + if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) { + mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus)); + hasUpdatingMnode = true; + } + + if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) { + SNodeInfo *pNode = &cfg.nodeInfo[cfg.replicaNum]; + tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pObj->pDnode->port; + if (pObj->pDnode->id == pMnode->selfDnodeId) { + cfg.myIndex = cfg.replicaNum; + } + cfg.replicaNum++; + } + + sdbReleaseLock(pSdb, pObj, false); + } + + if (cfg.myIndex == -1) { + mInfo("vgId:1, mnode not reload since selfIndex is -1"); + return; + } + + if (!mndGetRestored(pMnode)) { + mInfo("vgId:1, mnode not reload since restore not finished"); + return; + } + + if (hasUpdatingMnode) { + mInfo("vgId:1, start to reload mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); + for (int32_t i = 0; i < cfg.replicaNum; ++i) { + SNodeInfo *pNode = &cfg.nodeInfo[i]; + mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + } + + int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); + if (code != 0) { + mError("vgId:1, failed to reconfig mnode sync since %s", terrstr()); + } else { + mInfo("vgId:1, reconfig mnode sync success"); + } } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0e4dd9fc3c..866a3fa8b9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -107,7 +107,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { if (!pMnode->deploy) { mInfo("vgId:1, sync restore finished, and will handle outstanding transactions"); mndTransPullup(pMnode); - mndSetRestore(pMnode, true); + mndSetRestored(pMnode, true); } else { mInfo("vgId:1, sync restore finished"); } @@ -225,18 +225,17 @@ int32_t mndInitSync(SMnode *pMnode) { snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); syncInfo.pWal = pMnode->pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); - syncInfo.isStandBy = pMgmt->standby; syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT; - mInfo("vgId:1, start to open sync, standby:%d", pMgmt->standby); - if (pMgmt->standby || pMgmt->replica.id > 0) { - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = 1; - pCfg->myIndex = 0; - SNodeInfo *pNode = &pCfg->nodeInfo[0]; - tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = pMgmt->replica.port; - mInfo("vgId:1, ep:%s:%u", pNode->nodeFqdn, pNode->nodePort); + mInfo("vgId:1, start to open sync, selfIndex:%d replica:%d", pMgmt->selfIndex, pMgmt->numOfReplicas); + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = pMgmt->numOfReplicas; + pCfg->myIndex = pMgmt->selfIndex; + for (int32_t i = 0; i < pMgmt->numOfReplicas; ++i) { + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pMgmt->replicas[i].port; + mInfo("vgId:1, index:%d ep:%s:%u", i, pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); @@ -250,10 +249,6 @@ int32_t mndInitSync(SMnode *pMnode) { setPingTimerMS(pMgmt->sync, 5000); setElectTimerMS(pMgmt->sync, 3000); setHeartbeatTimerMS(pMgmt->sync, 500); - /* - setElectTimerMS(pMgmt->sync, 600); - setHeartbeatTimerMS(pMgmt->sync, 300); - */ mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; @@ -319,7 +314,7 @@ void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncStart(pMgmt->sync); - mInfo("vgId:1, sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); + mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); } void mndSyncStop(SMnode *pMnode) { @@ -331,7 +326,7 @@ void mndSyncStop(SMnode *pMnode) { taosWUnLockLatch(&pMnode->syncMgmt.lock); } -bool mndIsMaster(SMnode *pMnode) { +bool mndIsLeader(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; if (!syncIsReady(pMgmt->sync)) { @@ -340,7 +335,7 @@ bool mndIsMaster(SMnode *pMnode) { return false; } - if (!pMnode->restored) { + if (!mndGetRestored(pMnode)) { terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index d30cc8f9ca..a64b07fdbd 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -53,7 +53,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); -static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); } +static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransTimer(SRpcMsg *pReq); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index f922baf329..be93bd2f59 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -298,6 +298,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey); * @param pObj The object of the row. */ void sdbRelease(SSdb *pSdb, void *pObj); +void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock); /** * @brief Traverse a sdb table @@ -309,7 +310,7 @@ void sdbRelease(SSdb *pSdb, void *pObj); * @return void* The next iterator of the table. */ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj); -void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status); +void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status, bool lock); /** * @brief Cancel a traversal diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index ecdf8c71a7..0fee0fbca1 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -327,14 +327,16 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { taosThreadRwlockUnlock(pLock); } -void sdbRelease(SSdb *pSdb, void *pObj) { +void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { if (pObj == NULL) return; SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); if (pRow->type >= SDB_MAX) return; TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; - taosThreadRwlockWrlock(pLock); + if (lock) { + taosThreadRwlockWrlock(pLock); + } int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); sdbPrintOper(pSdb, pRow, "release"); @@ -342,9 +344,13 @@ void sdbRelease(SSdb *pSdb, void *pObj) { sdbFreeRow(pSdb, pRow, true); } - taosThreadRwlockUnlock(pLock); + if (lock) { + taosThreadRwlockUnlock(pLock); + } } +void sdbRelease(SSdb *pSdb, void *pObj) { sdbReleaseLock(pSdb, pObj, true); } + void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { *ppObj = NULL; @@ -372,14 +378,16 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { return ppRow; } -void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status) { +void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status, bool lock) { *ppObj = NULL; SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + if (lock) { + taosThreadRwlockRdlock(pLock); + } SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { @@ -395,7 +403,9 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat *status = pRow->status; break; } - taosThreadRwlockUnlock(pLock); + if (lock) { + taosThreadRwlockUnlock(pLock); + } return ppRow; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0afc373f2d..101e99afea 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -169,7 +169,7 @@ typedef struct SSyncNode { } SSyncNode; // open/close -------------- -SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 378ac743b3..af0e6ed960 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -86,10 +86,10 @@ void syncCleanUp() { } } -int64_t syncOpen(const SSyncInfo* pSyncInfo) { +int64_t syncOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); if (pSyncNode == NULL) { - sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); + sError("vgId:%d, failed to open sync node since %s", pSyncInfo->vgId, terrstr()); return -1; } @@ -230,7 +230,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg return ret; } -int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { +int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -238,6 +238,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } ASSERT(rid == pSyncNode->rid); +#if 0 if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; @@ -259,6 +260,12 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; +#else + syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); + syncNodeDoConfigChange(pSyncNode, pNewCfg, 0); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +#endif } int32_t syncLeaderTransfer(int64_t rid) { @@ -919,9 +926,7 @@ _END: } // open/close -------------- -SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { - SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -957,7 +962,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); goto _error; } - pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; + if (pSyncInfo->syncCfg.replicaNum == 0) { + pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; + } raftCfgClose(pSyncNode->pRaftCfg); pSyncNode->pRaftCfg = NULL; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 82f73a4fdd..1fbabe5265 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -194,7 +194,7 @@ ./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim ./test.sh -f tsim/mnode/basic3.sim -# TD-17919 ./test.sh -f tsim/mnode/basic4.sim +./test.sh -f tsim/mnode/basic4.sim ./test.sh -f tsim/mnode/basic5.sim # ---- show ---- diff --git a/tests/script/tsim/mnode/basic1.sim b/tests/script/tsim/mnode/basic1.sim index 59156080c8..31fc276298 100644 --- a/tests/script/tsim/mnode/basic1.sim +++ b/tests/script/tsim/mnode/basic1.sim @@ -42,6 +42,7 @@ sql_error drop mnode on dnode 1 print =============== create mnode 2 sql create mnode on dnode 2 +print =============== create mnode 2 finished $x = 0 step2: $x = $x + 1 @@ -69,9 +70,10 @@ if $data(2)[2] != follower then goto step2 endi -sleep 2000 print ============ drop mnode 2 sql drop mnode on dnode 2 + +print ============ drop mnode 2 finished sql select * from information_schema.ins_mnodes if $rows != 1 then return -1 @@ -109,6 +111,8 @@ sleep 2000 print =============== create mnodes sql create mnode on dnode 2 + +print =============== create mnode 2 finished sql select * from information_schema.ins_mnodes if $rows != 2 then return -1 diff --git a/tests/script/tsim/mnode/basic4.sim b/tests/script/tsim/mnode/basic4.sim index a2b8aa2f96..6739a87b42 100644 --- a/tests/script/tsim/mnode/basic4.sim +++ b/tests/script/tsim/mnode/basic4.sim @@ -45,49 +45,12 @@ endi if $data(2)[4] != ready then goto step2 endi - -system sh/exec.sh -n dnode3 -s stop -sql_error create mnode on dnode 3 - -print =============== step3: select * from information_schema.ins_mnodes - -$x = 0 -step3: - $x = $x + 1 - sleep 1000 - if $x == 10 then - return -1 - endi -sql select * from information_schema.ins_mnodes -x step3 -print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] -print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] -print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] - -$leaderNum = 0 -if $data(1)[2] == leader then - $leaderNum = 1 -endi -if $data(2)[2] == leader then - $leaderNum = 1 -endi -if $leaderNum == 0 then - goto step3 -endi -if $data(3)[2] != offline then - goto step3 -endi -if $data(1)[3] != ready then - goto step3 -endi -if $data(2)[3] != ready then - goto step3 -endi -if $data(3)[3] != creating then - goto step3 +if $data(3)[4] != ready then + goto step2 endi -print =============== step4: start dnode3 -system sh/exec.sh -n dnode3 -s start +print =============== step4: create mnode 3 +sql create mnode on dnode 3 $x = 0 step4: @@ -159,12 +122,11 @@ endi if $data(2)[3] != ready then goto step5 endi -if $data(3)[3] != dropping then - goto step5 -endi print =============== step6: start dnode3 system sh/exec.sh -n dnode3 -s start +sql drop mnode on dnode 1 -x step60 +step60: $x = 0 step6: diff --git a/tests/script/tsim/mnode/basic5.sim b/tests/script/tsim/mnode/basic5.sim index 387f38a717..16e8fa3dfa 100644 --- a/tests/script/tsim/mnode/basic5.sim +++ b/tests/script/tsim/mnode/basic5.sim @@ -232,12 +232,11 @@ endi if $leaderNum != 1 then goto step81 endi -if $data(1)[3] != dropping then - goto step81 -endi print =============== step9: start mnode1 and wait it dropped system sh/exec.sh -n dnode1 -s start +sql drop mnode on dnode 1 -x step90 +step90: $x = 0 step91: diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 1c6aef6286..2db40b707c 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -232,7 +232,7 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 -# unstable python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 5 -M 3 @@ -248,8 +248,8 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 - python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py -# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 -# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 +python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopFollowerLeader.py -N 5 -M 3 -- GitLab