diff --git a/.clang-format b/.clang-format index 3ddd8b43f6238f30000b96d8e676892ab2dcec68..f60fd3cb2629f933b6e25452c65716799e416c9e 100644 --- a/.clang-format +++ b/.clang-format @@ -86,5 +86,6 @@ SpacesInSquareBrackets: false Standard: Auto TabWidth: 8 UseTab: Never +AlignConsecutiveDeclarations: true ... diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index ec51a67808aafdc2c183111a785cf7d0ad3013ef..a3fa4cf9d490c86deef47bb854e2e8c0c318f896 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -922,18 +922,15 @@ typedef struct SShowRsp { typedef struct { char ep[TSDB_EP_LEN]; // end point, hostname:port - int32_t reserve[8]; } SCreateDnodeMsg; typedef struct { int32_t dnodeId; - int32_t reserve[8]; } SDropDnodeMsg; typedef struct { int32_t dnodeId; char config[TSDB_DNODE_CONFIG_LEN]; - int32_t reserve[8]; } SCfgDnodeMsg; typedef struct { @@ -942,7 +939,6 @@ typedef struct { typedef struct { int32_t dnodeId; - int8_t align[3]; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; } SCreateMnodeInMsg, SAlterMnodeInMsg; diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 3cf08e619ea4fd80a26f739f2b1248c490c0dd43..c4d69c4626a8ad4c5f48920e81d0149bd499fb08 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; pReplica->port = pDnode->opt.serverPort; - tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); + memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; @@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC SReplica *pReplica = &pOption->replicas[i]; pReplica->id = pMsg->replicas[i].id; pReplica->port = pMsg->replicas[i].port; - tstrncpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); + memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); if (pReplica->id == pOption->dnodeId) { pOption->selfIndex = i; } @@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) { return -1; } + dndReleaseMnode(pDnode, pMnode); dndStopMnodeWorker(pDnode); dndWriteMnodeFile(pDnode); mndClose(pMnode); + pMgmt->pMnode = NULL; mndDestroy(pDnode->dir.mnode); return 0; @@ -499,7 +501,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { } static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -515,18 +517,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; - } else { - SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { - return -1; - } - return dndAlterMnode(pDnode, &option); } + + SMnodeOpt option = {0}; + if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + return -1; + } + + if (dndAlterMnode(pDnode, &option) != 0) { + return -1; + } + + return dndWriteMnodeFile(pDnode); } static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { @@ -555,16 +562,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { code = dndProcessDropMnodeReq(pDnode, pMsg); break; default: - code = TSDB_CODE_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; break; } if (pMsg->msgType & 1u) { + if (code != 0) code = terrno; SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; taosFreeQitem(pMsg); } @@ -625,8 +633,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - assert(pQueue); - SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -647,15 +653,18 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { SMnode *pMnode = dndAcquireMnode(pDnode); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg != NULL) *pMsg = *pRpcMsg; + if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); - pRpcMsg->pCont = NULL; taosFreeQitem(pMsg); } + + dndReleaseMnode(pDnode, pMnode); } void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -894,6 +903,11 @@ int32_t dndInitMnode(SDnode *pDnode) { return -1; } + if (dndAllocMnodeMgmtQueue(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); pMgmt->file = strdup(path); @@ -935,8 +949,9 @@ void dndCleanupMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; dInfo("dnode-mnode start to clean up"); - dndStopMnodeWorker(pDnode); + if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode); + dndFreeMnodeMgmtQueue(pDnode); tfree(pMgmt->file); mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f8556c3e207baf7c6dabdfa0e46420410005331a..3d797eba8f0743886d4caa2205ff7588e620670c 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -140,7 +140,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { (*fp)(pDnode, pMsg, pEpSet); - dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); + dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); } else { dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); @@ -188,7 +188,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); + dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); dndProcessDnodeReq(pDnode, pMsg, pEpSet); return; } diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index a29926c8026950028690bf3d07fa224ee1dd5084..dcce270d7ddd30478e84cb848c0e73d8b4de5563 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -7,7 +7,7 @@ add_subdirectory(cluster) add_subdirectory(db) add_subdirectory(dnode) # add_subdirectory(func) -# add_subdirectory(mnode) +add_subdirectory(mnode) add_subdirectory(profile) add_subdirectory(show) add_subdirectory(stb) diff --git a/source/dnode/mgmt/impl/test/mnode/CMakeLists.txt b/source/dnode/mgmt/impl/test/mnode/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d6b3b16fb68743aa3c9503fcdc83369ab433c953 --- /dev/null +++ b/source/dnode/mgmt/impl/test/mnode/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. MTEST_SRC) +add_executable(dnode_test_mnode ${MTEST_SRC}) +target_link_libraries( + dnode_test_mnode + PUBLIC sut +) + +add_test( + NAME dnode_test_mnode + COMMAND dnode_test_mnode +) diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6724c8550036b22508e5398834e4561afefd0a8c --- /dev/null +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -0,0 +1,300 @@ +/** + * @file dnode.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module dnode-msg tests + * @version 0.1 + * @date 2021-12-15 + * + * @copyright Copyright (c) 2021 + * + */ + +#include "base.h" + +class DndTestMnode : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + public: + static void SetUpTestSuite() { + test.Init("/tmp/dnode_test_mnode1", 9061); + const char* fqdn = "localhost"; + const char* firstEp = "localhost:9061"; + + server2.Start("/tmp/dnode_test_mnode2", fqdn, 9062, firstEp); + server3.Start("/tmp/dnode_test_mnode3", fqdn, 9063, firstEp); + server4.Start("/tmp/dnode_test_mnode4", fqdn, 9064, firstEp); + server5.Start("/tmp/dnode_test_mnode5", fqdn, 9065, firstEp); + taosMsleep(300); + } + + static void TearDownTestSuite() { + server2.Stop(); + server3.Stop(); + server4.Stop(); + server5.Stop(); + test.Cleanup(); + } + + static Testbase test; + static TestServer server2; + static TestServer server3; + static TestServer server4; + static TestServer server5; +}; + +Testbase DndTestMnode::test; +TestServer DndTestMnode::server2; +TestServer DndTestMnode::server3; +TestServer DndTestMnode::server4; +TestServer DndTestMnode::server5; + +TEST_F(DndTestMnode, 01_ShowDnode) { + test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, ""); + CHECK_META("show mnodes", 5); + + CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); + CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint"); + CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, 12 + VARSTR_HEADER_SIZE, "role"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_TIMESTAMP, 8, "role_time"); + CHECK_SCHEMA(4, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); + + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckInt16(1); + CheckBinary("localhost:9061", TSDB_EP_LEN); + CheckBinary("master", 12); + CheckInt64(0); + CheckTimestamp(); +} + +TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { + { + int32_t contLen = sizeof(SCreateMnodeMsg); + + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MNODE_ALREADY_EXIST); + } +} + +TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) { + { + int32_t contLen = sizeof(SCreateMnodeMsg); + + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + } +} + +TEST_F(DndTestMnode, 04_Create_Mnode) { + { + // create dnode + int32_t contLen = sizeof(SCreateDnodeMsg); + + SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); + strcpy(pReq->ep, "localhost:9062"); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + taosMsleep(1300); + test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 2); + } + + { + // create mnode + int32_t contLen = sizeof(SCreateMnodeMsg); + + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 2); + + CheckInt16(1); + CheckInt16(2); + CheckBinary("localhost:9061", TSDB_EP_LEN); + CheckBinary("localhost:9062", TSDB_EP_LEN); + CheckBinary("master", 12); + CheckBinary("slave", 12); + CheckInt64(0); + CheckInt64(0); + CheckTimestamp(); + CheckTimestamp(); + } + + { + // drop mnode + int32_t contLen = sizeof(SDropMnodeMsg); + + SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckInt16(1); + CheckBinary("localhost:9061", TSDB_EP_LEN); + CheckBinary("master", 12); + CheckInt64(0); + CheckTimestamp(); + } +} +// { +// int32_t contLen = sizeof(SDropDnodeMsg); + +// SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen); +// pReq->dnodeId = htonl(2); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 1); + +// CheckInt16(1); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckBinary("", 24); + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9063"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9064"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9065"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// taosMsleep(1300); +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 4); + +// CheckInt16(1); +// CheckInt16(3); +// CheckInt16(4); +// CheckInt16(5); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckBinary("localhost:9063", TSDB_EP_LEN); +// CheckBinary("localhost:9064", TSDB_EP_LEN); +// CheckBinary("localhost:9065", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); + +// // restart +// uInfo("stop all server"); +// test.Restart(); +// server2.Restart(); +// server3.Restart(); +// server4.Restart(); +// server5.Restart(); + +// taosMsleep(1300); +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 4); + +// CheckInt16(1); +// CheckInt16(3); +// CheckInt16(4); +// CheckInt16(5); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckBinary("localhost:9063", TSDB_EP_LEN); +// CheckBinary("localhost:9064", TSDB_EP_LEN); +// CheckBinary("localhost:9065", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index da5492057458b09aed332168919ce1b38a46e959..a0404d5bccf9f12508373219953a60960f03045b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -290,11 +290,6 @@ typedef struct SMnodeMsg { char db[TSDB_FULL_DB_NAME_LEN]; int32_t acctId; SMnode *pMnode; - int16_t received; - int16_t successed; - int16_t expected; - int16_t retry; - int32_t code; int64_t createdTime; SRpcMsg rpcMsg; int32_t contLen; diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 906d11aec2b123a7cffc6a7a9d5fd28a44afe356..5df13915632469075f995fd33c2b36ee79aa9271 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -27,6 +27,7 @@ void mndCleanupMnode(SMnode *pMnode); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); char *mndGetRoleStr(int32_t role); +void mndUpdateMnodeRole(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 108896c121eeb1d3d93ffd3a1a9c93cc6f413235..dd474d85f3cd3cf106446cf243442b2d1c964921 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); if (pDb == NULL) { - mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); + mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr()); return -1; } @@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); if (pDb == NULL) { - mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); + mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 869b6e538b711b8782144041c450a410210fb67a..a019c0dc55406aa20841d28180e8104e2657b245 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -23,14 +23,15 @@ #define TSDB_MNODE_RESERVE_SIZE 64 static int32_t mndCreateDefaultMnode(SMnode *pMnode); -static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj); +static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj); static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); -static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj); -static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj); +static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); +static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode); -static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg); +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); @@ -46,9 +47,10 @@ int32_t mndInitMnode(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, .deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; - mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg); - mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeReq); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP, mndProcessCreateMnodeRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP, mndProcessAlterMnodeRsp); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE_IN_RSP, mndProcessDropMnodeRsp); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta); @@ -69,9 +71,9 @@ static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { return pObj; } -static void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pMnodeObj) { +static void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { SSdb *pSdb = pMnode->pSdb; - sdbRelease(pSdb, pMnodeObj); + sdbRelease(pSdb, pObj); } char *mndGetRoleStr(int32_t showType) { @@ -87,6 +89,24 @@ char *mndGetRoleStr(int32_t showType) { } } +void mndUpdateMnodeRole(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SMnodeObj *pObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); + if (pIter == NULL) break; + + if (pObj->id == 1) { + pObj->role = TAOS_SYNC_STATE_LEADER; + } else { + pObj->role = TAOS_SYNC_STATE_CANDIDATE; + } + + sdbRelease(pSdb, pObj); + } +} + static int32_t mndCreateDefaultMnode(SMnode *pMnode) { SMnodeObj mnodeObj = {0}; mnodeObj.id = 1; @@ -101,14 +121,14 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } -static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) { +static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) { SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj) + TSDB_MNODE_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pMnodeObj->id); - SDB_SET_INT64(pRaw, dataPos, pMnodeObj->createdTime) - SDB_SET_INT64(pRaw, dataPos, pMnodeObj->updateTime) + SDB_SET_INT32(pRaw, dataPos, pObj->id); + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime) SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE) return pRaw; @@ -125,42 +145,38 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { } SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); - SMnodeObj *pMnodeObj = sdbGetRowObj(pRow); - if (pMnodeObj == NULL) return NULL; + SMnodeObj *pObj = sdbGetRowObj(pRow); + if (pObj == NULL) return NULL; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pMnodeObj->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->updateTime) + SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime) SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_MNODE_RESERVE_SIZE) return pRow; } -static void mnodeResetMnode(SMnodeObj *pMnodeObj) { - pMnodeObj->role = TAOS_SYNC_STATE_FOLLOWER; - pMnodeObj->roleTerm = 0; - pMnodeObj->roleTime = 0; -} +static void mnodeResetMnode(SMnodeObj *pObj) { pObj->role = TAOS_SYNC_STATE_FOLLOWER; } -static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { - mTrace("mnode:%d, perform insert action", pMnodeObj->id); - pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id); - if (pMnodeObj->pDnode == NULL) { +static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { + mTrace("mnode:%d, perform insert action", pObj->id); + pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); + if (pObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - mError("mnode:%d, failed to perform insert action since %s", pMnodeObj->id, terrstr()); + mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr()); return -1; } - mnodeResetMnode(pMnodeObj); + mnodeResetMnode(pObj); return 0; } -static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { - mTrace("mnode:%d, perform delete action", pMnodeObj->id); - if (pMnodeObj->pDnode != NULL) { - sdbRelease(pSdb, pMnodeObj->pDnode); - pMnodeObj->pDnode = NULL; +static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { + mTrace("mnode:%d, perform delete action", pObj->id); + if (pObj->pDnode != NULL) { + sdbRelease(pSdb, pObj->pDnode); + pObj->pDnode = NULL; } return 0; @@ -168,8 +184,6 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) { mTrace("mnode:%d, perform update action", pOldMnode->id); - pOldMnode->id = pNewMnode->id; - pOldMnode->createdTime = pNewMnode->createdTime; pOldMnode->updateTime = pNewMnode->updateTime; return 0; } @@ -177,12 +191,12 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { SSdb *pSdb = pMnode->pSdb; - SMnodeObj *pMnodeObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId); - if (pMnodeObj == NULL) { + SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId); + if (pObj == NULL) { return false; } - sdbRelease(pSdb, pMnodeObj); + sdbRelease(pSdb, pObj); return true; } @@ -193,14 +207,14 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { void *pIter = NULL; while (1) { - SMnodeObj *pMnodeObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMnodeObj); + SMnodeObj *pObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); if (pIter == NULL) break; - if (pMnodeObj->pDnode == NULL) break; + if (pObj->pDnode == NULL) break; - pEpSet->port[pEpSet->numOfEps] = htons(pMnodeObj->pDnode->port); - tstrncpy(pEpSet->fqdn[pEpSet->numOfEps], pMnodeObj->pDnode->fqdn, TSDB_FQDN_LEN); - if (pMnodeObj->role == TAOS_SYNC_STATE_LEADER) { + pEpSet->port[pEpSet->numOfEps] = htons(pObj->pDnode->port); + memcpy(pEpSet->fqdn[pEpSet->numOfEps], pObj->pDnode->fqdn, TSDB_FQDN_LEN); + if (pObj->role == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } @@ -208,54 +222,153 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) { +static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; + + SCreateMnodeInMsg createMsg = {0}; + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pMObj->id); + pReplica->port = htons(pMObj->pDnode->port); + memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + + sdbRelease(pSdb, pMObj); + } + + SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pDnode->id); + pReplica->port = htons(pDnode->port); + memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + + createMsg.replica = numOfReplicas; + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + STransAction action = {0}; + + SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + + pMsg->dnodeId = htonl(pMObj->id); + action.epSet = mndGetDnodeEpset(pMObj->pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SAlterMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + + sdbRelease(pSdb, pMObj); + } + + { + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + + SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg)); + if (pMsg == NULL) return -1; + memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + pMsg->dnodeId = htonl(pObj->id); + + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SCreateMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } + + return 0; +} + +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) { SMnodeObj mnodeObj = {0}; - mnodeObj.id = 1; // todo + mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE); mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { - mError("dnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - return -1; + mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + goto CREATE_MNODE_OVER; } - mDebug("trans:%d, used to create dnode:%d", pTrans->id, pCreate->dnodeId); + mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); - SSdbRaw *pRedoRaw = mndMnodeActionEncode(&mnodeObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); - SSdbRaw *pUndoRaw = mndMnodeActionEncode(&mnodeObj); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); - SSdbRaw *pCommitRaw = mndMnodeActionEncode(&mnodeObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_MNODE_OVER; } + code = 0; + +CREATE_MNODE_OVER: mndTransDrop(pTrans); - return 0; + return code; } -static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; @@ -263,22 +376,23 @@ static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { mDebug("mnode:%d, start to create", pCreate->dnodeId); - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); - if (pDnode == NULL) { - mError("mnode:%d, dnode not exist", pDnode->id); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId); + if (pObj != NULL) { + mndReleaseMnode(pMnode, pObj); + mError("mnode:%d, mnode already exist", pObj->id); + terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; return -1; } - mndReleaseDnode(pMnode, pDnode); - SMnodeObj *pMnodeObj = mndAcquireMnode(pMnode, pCreate->dnodeId); - if (pMnodeObj != NULL) { - mError("mnode:%d, mnode already exist", pMnodeObj->id); - terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); + if (pDnode == NULL) { + mError("mnode:%d, dnode not exist", pCreate->dnodeId); + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } - int32_t code = mndCreateMnode(pMnode, pMsg, pCreate); + int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate); + mndReleaseDnode(pMnode, pDnode); if (code != 0) { mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); @@ -288,49 +402,140 @@ static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeObj) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); +static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + return 0; +} + +static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; + + SAlterMnodeInMsg alterMsg = {0}; + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + if (pMObj->id != pObj->id) { + SReplica *pReplica = &alterMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pMObj->id); + pReplica->port = htons(pMObj->pDnode->port); + memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + } + + sdbRelease(pSdb, pMObj); + } + + alterMsg.replica = numOfReplicas; + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + if (pMObj->id != pObj->id) { + STransAction action = {0}; + + SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg)); + + pMsg->dnodeId = htonl(pMObj->id); + action.epSet = mndGetDnodeEpset(pMObj->pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SAlterMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + } + + sdbRelease(pSdb, pMObj); + } + + { + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + + SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pMsg->dnodeId = htonl(pObj->id); + + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SDropMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_DROP_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } + + return 0; +} + +static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { - mError("mnode:%d, failed to drop since %s", pMnodeObj->id, terrstr()); - return -1; + mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); + goto DROP_MNODE_OVER; } - mDebug("trans:%d, used to drop user:%d", pTrans->id, pMnodeObj->id); - SSdbRaw *pRedoRaw = mndMnodeActionEncode(pMnodeObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); + + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); - SSdbRaw *pUndoRaw = mndMnodeActionEncode(pMnodeObj); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - SSdbRaw *pCommitRaw = mndMnodeActionEncode(pMnodeObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto DROP_MNODE_OVER; } + code = 0; + +DROP_MNODE_OVER: mndTransDrop(pTrans); - return 0; + return code; } -static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); @@ -343,14 +548,14 @@ static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return -1; } - SMnodeObj *pMnodeObj = mndAcquireMnode(pMnode, pDrop->dnodeId); - if (pMnodeObj == NULL) { + SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId); + if (pObj == NULL) { mError("mnode:%d, not exist", pDrop->dnodeId); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } - int32_t code = mndDropMnode(pMnode, pMsg, pMnodeObj); + int32_t code = mndDropMnode(pMnode, pMsg, pObj); if (code != 0) { mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); @@ -361,9 +566,20 @@ static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; @@ -414,6 +630,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + mndUpdateMnodeRole(pMnode); return 0; } @@ -422,46 +639,39 @@ static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; - SMnodeObj *pMnodeObj = NULL; + SMnodeObj *pObj = NULL; char *pWrite; while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pMnodeObj); + pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj); if (pShow->pIter == NULL) break; cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pMnodeObj->id; + *(int16_t *)pWrite = pObj->id; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pMnodeObj->id); - if (pDnode != NULL) { - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->ep, pShow->bytes[cols]); - } else { - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols]); - } - mndReleaseDnode(pMnode, pDnode); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char *roles = mndGetRoleStr(pMnodeObj->role); + char *roles = mndGetRoleStr(pObj->role); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pMnodeObj->roleTime; + *(int64_t *)pWrite = pObj->roleTime; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pMnodeObj->createdTime; + *(int64_t *)pWrite = pObj->createdTime; cols++; numOfRows++; - sdbRelease(pSdb, pMnodeObj); + sdbRelease(pSdb, pObj); } mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 59161b32f290ff7c2eb9c4b724c8d43257f51ca0..5e9165f8980ae9946f86273c6931481a85a49145 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -33,4 +33,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { return code; } -bool mndIsMaster(SMnode *pMnode) { return true; } \ No newline at end of file +bool mndIsMaster(SMnode *pMnode) { + // pMnode->role = TAOS_SYNC_STATE_LEADER; + return true; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 54cd6ab501a0a9294b0f35e575c365d44e14b7bd..53e3bc52036dee574f43dcd96889146bdc312445 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -622,10 +622,10 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; - pAction->errCode = pMsg->code; + pAction->errCode = pMsg->rpcMsg.code; } - mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); + mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code); mndTransExecute(pMnode, pTrans); HANDLE_ACTION_RSP_OVER: @@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent) continue; + if (pAction->msgReceived && pAction->errCode == 0) continue; int64_t signature = pTrans->id; signature = (signature << 32); @@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA terrno = errorCode; return errorCode; } else { + mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceivedMsgs, numOfActions, errorCode); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index fb0b95dc4a696f9a7b2751f3735d73d059838c60..a62a0a9296c120bb0d34c18461f10f1a18f4e0a5 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) { // (*pMnode->reportProgress)(pStep->name, "start initialize"); if ((*pStep->initFp)(pMnode) != 0) { + int32_t code = terrno; mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mndCleanupSteps(pMnode, pos); + terrno = code; return -1; } else { mDebug("step:%s is initialized", pStep->name); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 77614e399e1c8940b7b3eada86ce28b798f16732..bb0e606463dcacd209692f5bdcf41f53600e6ee8 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "sdbInt.h" +static int32_t sdbCreateDir(SSdb *pSdb); + SSdb *sdbInit(SSdbOpt *pOption) { mDebug("start to init sdb in %s", pOption->path); @@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) { return NULL; } + if (sdbCreateDir(pSdb) != 0) { + sdbCleanup(pSdb); + return NULL; + } + for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); } @@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); // if (pSdb->curVer != pSdb->lastCommitVer) { - mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); - sdbWriteFile(pSdb); + mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); + sdbWriteFile(pSdb); // } if (pSdb->currDir != NULL) { @@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { mDebug("sdb table:%d is initialized", sdbType); return 0; -} \ No newline at end of file +} + +static int32_t sdbCreateDir(SSdb *pSdb) { + if (taosMkDir(pSdb->currDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->currDir, terrstr()); + return -1; + } + + if (taosMkDir(pSdb->syncDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); + return -1; + } + + if (taosMkDir(pSdb->tmpDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); + return -1; + } + + return 0; +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index af37e9e1d5a60f8a609304ddcc956e30f9ef7d31..7828e39e566cdc62fcdecdabbae6b0aa52414605 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -17,28 +17,6 @@ #include "sdbInt.h" #include "tchecksum.h" -static int32_t sdbCreateDir(SSdb *pSdb) { - if (taosMkDir(pSdb->currDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->currDir, terrstr()); - return -1; - } - - if (taosMkDir(pSdb->syncDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); - return -1; - } - - if (taosMkDir(pSdb->tmpDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); - return -1; - } - - return 0; -} - static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); @@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) { free(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to read file:%s since %s", file, terrstr()); - return -1; + return 0; } while (1) { @@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) { } int32_t sdbDeploy(SSdb *pSdb) { - if (sdbCreateDir(pSdb) != 0) { - return -1; - } - if (sdbRunDeployFp(pSdb) != 0) { return -1; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 11972e84cba613ed6dd757d91c6083eb6c0b520d..fb7b71b845e5f5a5584f651e14b20d67fa3d6eb4 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) { void tWorkerCleanup(SWorkerPool *pool) { for (int i = 0; i < pool->max; ++i) { SWorker *worker = pool->workers + i; + if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } @@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) { for (int i = 0; i < pool->max; ++i) { SWorker *worker = pool->workers + i; + if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); }