diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 8bd3db021773585fa1be0db6198bdbd3948888fd..a42736e4fb337b244d80486a7e8e0e3fa4a79652 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -125,8 +125,7 @@ typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyTy typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, - SDB_STATUS_DROPPING = 3, - SDB_STATUS_DROPPED = 4 + SDB_STATUS_DROPPED = 3 } ESdbStatus; typedef enum { @@ -258,7 +257,7 @@ int32_t sdbDeploy(SSdb *pSdb); int32_t sdbReadFile(SSdb *pSdb); /** - * @brief Parse and write raw data to sdb. + * @brief Parse and write raw data to sdb, then free the pRaw object * * @param pSdb The sdb object. * @param pRaw The raw data. @@ -266,6 +265,15 @@ int32_t sdbReadFile(SSdb *pSdb); */ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw); +/** + * @brief Parse and write raw data to sdb. + * + * @param pSdb The sdb object. + * @param pRaw The raw data. + * @return int32_t 0 for success, -1 for failure. + */ +int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw); + /** * @brief Acquire a row from sdb * diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index ad1667eac566ddf389fa4df0c6b656041f3439b2..92b0967345eb2a94f5f059e041c70abe704cccd7 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -34,7 +34,6 @@ void initLog(const char* path) { sDebugFlag = 0; tsdbDebugFlag = 0; cqDebugFlag = 0; - debugFlag = 0; char temp[PATH_MAX]; snprintf(temp, PATH_MAX, "%s/taosdlog", path); diff --git a/source/dnode/mgmt/impl/test/user/CMakeLists.txt b/source/dnode/mgmt/impl/test/user/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b5f02e41f4c594e353ce02b74f1feb42de205683 --- /dev/null +++ b/source/dnode/mgmt/impl/test/user/CMakeLists.txt @@ -0,0 +1,29 @@ +add_executable(dndTestUser "") + +target_sources(dndTestUser + PRIVATE + "user.cpp" + "../sut/deploy.cpp" +) + +target_link_libraries( + dndTestUser + PUBLIC dnode + PUBLIC util + PUBLIC os + PUBLIC gtest_main +) + +target_include_directories(dndTestUser + PUBLIC + "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" + "${CMAKE_CURRENT_SOURCE_DIR}/../../inc" + "${CMAKE_CURRENT_SOURCE_DIR}/../sut" +) + +enable_testing() + +add_test( + NAME dndTestUser + COMMAND dndTestUser +) diff --git a/source/dnode/mgmt/impl/test/user/user.cpp b/source/dnode/mgmt/impl/test/user/user.cpp new file mode 100644 index 0000000000000000000000000000000000000000..befe005d559a4ccbed52d61848d42186f65a9c7b --- /dev/null +++ b/source/dnode/mgmt/impl/test/user/user.cpp @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "deploy.h" + +class DndTestUser : public ::testing::Test { + protected: + void SetUp() override {} + void TearDown() override {} + + static void SetUpTestSuite() { + const char* user = "root"; + const char* pass = "taosdata"; + const char* path = "/tmp/dndTestUser"; + const char* fqdn = "localhost"; + uint16_t port = 9524; + + pServer = createServer(path, fqdn, port); + ASSERT(pServer); + pClient = createClient(user, pass, fqdn, port); + } + + static void TearDownTestSuite() { + dropServer(pServer); + dropClient(pClient); + } + + static SServer* pServer; + static SClient* pClient; + static int32_t connId; +}; + +SServer* DndTestUser::pServer; +SClient* DndTestUser::pClient; +int32_t DndTestUser::connId; + +#if 0 +TEST_F(DndTestUser, ShowUser) { + int32_t showId = 0; + + //--- meta --- + SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pShow->type = TSDB_MGMT_TABLE_USER; + strcpy(pShow->db, ""); + + SRpcMsg showRpcMsg = {0}; + showRpcMsg.pCont = pShow; + showRpcMsg.contLen = sizeof(SShowMsg); + showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &showRpcMsg); + ASSERT_NE(pClient->pRsp, nullptr); + + SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; + ASSERT_NE(pShowRsp, nullptr); + pShowRsp->showId = htonl(pShowRsp->showId); + STableMetaMsg* pMeta = &pShowRsp->tableMeta; + pMeta->contLen = htonl(pMeta->contLen); + pMeta->numOfColumns = htons(pMeta->numOfColumns); + pMeta->sversion = htons(pMeta->sversion); + pMeta->tversion = htons(pMeta->tversion); + pMeta->tid = htonl(pMeta->tid); + pMeta->uid = htobe64(pMeta->uid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pShowRsp->showId; + + EXPECT_NE(pShowRsp->showId, 0); + EXPECT_EQ(pMeta->contLen, 0); + EXPECT_STREQ(pMeta->tableFname, "show users"); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->numOfColumns, 4); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tid, 0); + EXPECT_EQ(pMeta->uid, 0); + EXPECT_STREQ(pMeta->sTableName, ""); + EXPECT_EQ(pMeta->suid, 0); + + SSchema* pSchema = NULL; + + pSchema = &pMeta->schema[0]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "name"); + + pSchema = &pMeta->schema[1]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, 10 + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "privilege"); + + pSchema = &pMeta->schema[2]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); + EXPECT_EQ(pSchema->bytes, 8); + EXPECT_STREQ(pSchema->name, "create_time"); + + pSchema = &pMeta->schema[3]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "account"); + + //--- retrieve --- + SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pRetrieve->showId = htonl(showId); + pRetrieve->free = 0; + + SRpcMsg retrieveRpcMsg = {0}; + retrieveRpcMsg.pCont = pRetrieve; + retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); + retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &retrieveRpcMsg); + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + + SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; + ASSERT_NE(pRetrieveRsp, nullptr); + pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); + pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset); + pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); + pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); + + EXPECT_EQ(pRetrieveRsp->numOfRows, 2); + EXPECT_EQ(pRetrieveRsp->offset, 0); + EXPECT_EQ(pRetrieveRsp->useconds, 0); + EXPECT_EQ(pRetrieveRsp->completed, 1); + EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRetrieveRsp->compressed, 0); + EXPECT_EQ(pRetrieveRsp->reserved, 0); + EXPECT_EQ(pRetrieveRsp->compLen, 0); + + char* pData = pRetrieveRsp->data; + int32_t pos = 0; + char* strVal = NULL; + int64_t int64Val = 0; + + //--- name --- + { + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "root"); + + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "_root"); + } + + //--- privilege --- + { + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += 10; + EXPECT_STREQ(strVal, "super"); + + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += 10; + EXPECT_STREQ(strVal, "writable"); + } + + //--- create_time --- + { + int64Val = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_GT(int64Val, 0); + + int64Val = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_GT(int64Val, 0); + } + + //--- account --- + { + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "root"); + + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "root"); + } +} +#endif + +TEST_F(DndTestUser, CreateUser_01) { + ASSERT_NE(pClient, nullptr); + + //--- create user --- + SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); + strcpy(pReq->user, "u1"); + strcpy(pReq->pass, "p1"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateUserMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER; + + sendMsg(pClient, &rpcMsg); + // taosMsleep(10000000); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + //--- meta --- + SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pShow->type = TSDB_MGMT_TABLE_USER; + SRpcMsg showRpcMsg = {0}; + showRpcMsg.pCont = pShow; + showRpcMsg.contLen = sizeof(SShowMsg); + showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &showRpcMsg); + SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; + STableMetaMsg* pMeta = &pShowRsp->tableMeta; + pMeta->numOfColumns = htons(pMeta->numOfColumns); + EXPECT_EQ(pMeta->numOfColumns, 4); + + //--- retrieve --- + SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pRetrieve->showId = pShowRsp->showId; + SRpcMsg retrieveRpcMsg = {0}; + retrieveRpcMsg.pCont = pRetrieve; + retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); + retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &retrieveRpcMsg); + SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; + pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); + EXPECT_EQ(pRetrieveRsp->numOfRows, 3); + + char* pData = pRetrieveRsp->data; + int32_t pos = 0; + char* strVal = NULL; + + //--- name --- + { + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "u1"); + + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "root"); + + pos += sizeof(VarDataLenT); + strVal = (char*)(pData + pos); + pos += TSDB_USER_LEN; + EXPECT_STREQ(strVal, "_root"); + } +} + +// TEST_F(DndTestUser, AlterUser) { +// ASSERT_NE(pClient, nullptr); + +// SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg)); + +// SRpcMsg rpcMsg = {0}; +// rpcMsg.pCont = pReq; +// rpcMsg.contLen = sizeof(SAlterUserMsg); +// rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_ACCT; + +// sendMsg(pClient, &rpcMsg); +// SRpcMsg* pMsg = pClient->pRsp; +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); +// } + +// TEST_F(DndTestUser, DropUser) { +// ASSERT_NE(pClient, nullptr); + +// SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg)); + +// SRpcMsg rpcMsg = {0}; +// rpcMsg.pCont = pReq; +// rpcMsg.contLen = sizeof(SDropUserMsg); +// rpcMsg.msgType = TSDB_MSG_TYPE_DROP_ACCT; + +// sendMsg(pClient, &rpcMsg); +// SRpcMsg* pMsg = pClient->pRsp; +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); +// } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9e4f9225408fcc9429d74c5f19007e2cb8d92659..cb5c0c27554a3ecbcf82246a935c52f7ebe119a7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -318,6 +318,11 @@ typedef struct SMnodeMsg { void *pCont; } SMnodeMsg; +typedef struct { + int32_t id; + void *rpcHandle; +} STransMsg; + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 3e45b0adb13658d11d3bf467d727cef02dd0afbb..02ba725be14c0de214d767c2e97b6c42e7727f5d 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); -int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData); +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 9559a9625558be2adad19e0177ddd5ebbf6bde60..878337e4be7c10564f9f7f7d7941e526c3a73239 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -33,13 +33,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); -int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)); -int32_t mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code); +int32_t mndTransPrepare(STrans *pTrans); +void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); int32_t mndTransExecute(SSdb *pSdb, int32_t tranId); -SSdbRaw *mndTransActionEncode(STrans *pTrans); -SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index e3d37cd9f9fe326144a3594bb705b16c7fb78dab..3d668a76f8373fa019f4af41d2e54bb5d26369fb 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -65,7 +65,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("acct:%s, will be created while deploy sdb", acctObj.acct); + mDebug("acct:%s, will be created while deploy sdb", acctObj.acct); return sdbWrite(pMnode->pSdb, pRaw); } @@ -132,14 +132,14 @@ static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { } static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { - mTrace("acct:%s, perform update action", pSrcAcct->acct); - - memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN); - pSrcAcct->createdTime = pDstAcct->createdTime; - pSrcAcct->updateTime = pDstAcct->updateTime; - pSrcAcct->acctId = pDstAcct->acctId; - pSrcAcct->status = pDstAcct->status; - pSrcAcct->cfg = pDstAcct->cfg; + mTrace("acct:%s, perform update action", pDstAcct->acct); + + memcpy(pDstAcct->acct, pSrcAcct->acct, TSDB_USER_LEN); + pDstAcct->createdTime = pSrcAcct->createdTime; + pDstAcct->updateTime = pSrcAcct->updateTime; + pDstAcct->acctId = pSrcAcct->acctId; + pDstAcct->status = pSrcAcct->status; + pDstAcct->cfg = pSrcAcct->cfg; return 0; } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 9fa7a066306308af24db8f31e7371349cf596696..f260c83538c3ff04be3bdb81fcda878fa0c381be 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -108,7 +108,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { } static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) { - mTrace("cluster:%d, perform update action", pSrcCluster->id); + mTrace("cluster:%d, perform update action", pDstCluster->id); return 0; } @@ -132,7 +132,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id); + mDebug("cluster:%d, will be created while deploy sdb", clusterObj.id); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 241b006293000a47eaf5650b52a692910e550540..356b3073bf5a1b03b03e6c44dfbf14cbecddb12c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -102,12 +102,12 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { } static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) { - mTrace("dnode:%d, perform update action", pSrcDnode->id); - pSrcDnode->id = pDstDnode->id; - pSrcDnode->createdTime = pDstDnode->createdTime; - pSrcDnode->updateTime = pDstDnode->updateTime; - pSrcDnode->port = pDstDnode->port; - memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); + mTrace("dnode:%d, perform update action", pDstDnode->id); + pDstDnode->id = pSrcDnode->id; + pDstDnode->createdTime = pSrcDnode->createdTime; + pDstDnode->updateTime = pSrcDnode->updateTime; + pDstDnode->port = pSrcDnode->port; + memcpy(pDstDnode->fqdn, pSrcDnode->fqdn, TSDB_FQDN_LEN); return 0; } @@ -123,7 +123,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("dnode:%d, will be created while deploy sdb", dnodeObj.id); + mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index eabcce04c5ca84f10aa0fda10c9e986af80ebabc..6fa55bb9f1caff3d2f4546b5e893710a518933b9 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -82,10 +82,10 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { } static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) { - mTrace("mnode:%d, perform update action", pSrcMnode->id); - pSrcMnode->id = pDstMnode->id; - pSrcMnode->createdTime = pDstMnode->createdTime; - pSrcMnode->updateTime = pDstMnode->updateTime; + mTrace("mnode:%d, perform update action", pDstMnode->id); + pDstMnode->id = pSrcMnode->id; + pDstMnode->createdTime = pSrcMnode->createdTime; + pDstMnode->updateTime = pSrcMnode->updateTime; return 0; } @@ -99,7 +99,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("mnode:%d, will be created while deploy sdb", mnodeObj.id); + mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 723191531180db0a5b18799538ab3e3023d26fca..6e7ee662f8de514ff886ef0f8493969f9a919ebe 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -21,9 +21,15 @@ int32_t mndInitSync(SMnode *pMnode) { return 0; } void mndCleanupSync(SMnode *pMnode) {} -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, void *pData) { - mndTransApply(pMnode, pData, pData, 0); - free(pData); +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg) { + int32_t code = 0; + + int32_t len = sdbGetRawTotalSize(pRaw); + SSdbRaw *pReceived = calloc(1, len); + memcpy(pReceived, pRaw, len); + mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); + + mndTransApply(pMnode, pReceived, pMsg, code); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 663ae76506c39a459f856f8479e0dc252083b8dd..5beb1b10e36bc954b9283113b91cf7ca4ab4aa01 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -189,12 +189,12 @@ static void mndSendTelemetryReport(SMnode* pMnode) { char buf[128] = {0}; uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); if (ip == 0xffffffff) { - mTrace("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno)); + mDebug("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno)); return; } SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); if (fd < 0) { - mTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); + mDebug("failed to create socket for telemetry, reason:%s", strerror(errno)); return; } @@ -228,7 +228,7 @@ static void mndSendTelemetryReport(SMnode* pMnode) { // read something to avoid nginx error 499 if (taosReadSocket(fd, buf, 10) < 0) { - mTrace("failed to receive response since %s", strerror(errno)); + mDebug("failed to receive response since %s", strerror(errno)); } taosCloseSocket(fd); @@ -297,7 +297,7 @@ int32_t mndInitTelem(SMnode* pMnode) { int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode); pthread_attr_destroy(&attr); if (code != 0) { - mTrace("failed to create telemetry thread since :%s", strerror(code)); + mDebug("failed to create telemetry thread since :%s", strerror(code)); } mInfo("mnd telemetry is initialized"); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7eab16895cc8281624bf1484c69f8c0260a0f9d1..8d1521c79a8885ce0f1715bf79d280f69ddb6eaf 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -15,13 +15,33 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" -#include "trpc.h" +#include "mndSync.h" #define SDB_TRANS_VER 1 #define TRN_DEFAULT_ARRAY_SIZE 8 -SSdbRaw *mndTransActionEncode(STrans *pTrans) { - int32_t rawDataLen = 10 * sizeof(int32_t); +static SSdbRaw *mndTransActionEncode(STrans *pTrans); +static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); +static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); +static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans); +static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); + +int32_t mndInitTrans(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_TRANS, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndTransActionEncode, + .decodeFp = (SdbDecodeFp)mndTransActionDecode, + .insertFp = (SdbInsertFp)mndTransActionInsert, + .updateFp = (SdbUpdateFp)mndTransActionUpdate, + .deleteFp = (SdbDeleteFp)mndTransActionDelete}; + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupTrans(SMnode *pMnode) {} + +static SSdbRaw *mndTransActionEncode(STrans *pTrans) { + int32_t rawDataLen = 16 * sizeof(int32_t); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -29,23 +49,23 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) { int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); rawDataLen += sdbGetRawTotalSize(pTmp); } for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); rawDataLen += sdbGetRawTotalSize(pTmp); } for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); rawDataLen += sdbGetRawTotalSize(pTmp); } SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); if (pRaw == NULL) { - mError("trn:%d, failed to alloc raw since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); return NULL; } @@ -60,31 +80,33 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT32(pRaw, dataPos, undoActionNum) for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, i); + SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } - mDebug("trn:%d, is encoded as raw:%p, len:%d", pTrans->id, pRaw, dataPos); + mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); return pRaw; } -SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { +static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { + int32_t code = 0; + int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) { mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr()); @@ -97,8 +119,8 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); - STrans *pTrans = sdbGetRowObj(pRow); + SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); + STrans *pTrans = sdbGetRowObj(pRow); if (pTrans == NULL) { mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr()); return NULL; @@ -112,9 +134,9 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mDebug("trn:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw); - return NULL; + mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw); + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; } int32_t redoLogNum = 0; @@ -133,81 +155,111 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) - int32_t code = 0; for (int32_t i = 0; i < redoLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); - void *ret = taosArrayPush(pTrans->redoLogs, pData); + void *ret = taosArrayPush(pTrans->redoLogs, &pData); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + break; + } + } + + for (int32_t i = 0; i < undoLogNum; ++i) { + int32_t dataLen = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) + + char *pData = malloc(dataLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->undoLogs, &pData); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + break; + } + } + + for (int32_t i = 0; i < commitLogNum; ++i) { + int32_t dataLen = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) + + char *pData = malloc(dataLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->commitLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; break; } } +TRANS_DECODE_OVER: if (code != 0) { - terrno = code; - mError("trn:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); + mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno)); mndTransDrop(pTrans); + terrno = code; return NULL; } - mDebug("trn:%d, is parsed from raw:%p", pTrans->id, pRaw); + mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw); return pRow; } static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { + mTrace("trans:%d, perform insert action, stage:%d", pTrans->id, pTrans->stage); + SArray *pArray = pTrans->redoLogs; int32_t arraySize = taosArrayGetSize(pArray); for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGet(pArray, i); + SSdbRaw *pRaw = taosArrayGetP(pArray, i); int32_t code = sdbWrite(pSdb, pRaw); if (code != 0) { - mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); + mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); return code; } } - - mDebug("trn:%d, write to sdb", pTrans->id); return 0; } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - SArray *pArray = pTrans->redoLogs; + mTrace("trans:%d, perform delete action, stage:%d", pTrans->id, pTrans->stage); + + SArray *pArray = pTrans->undoLogs; int32_t arraySize = taosArrayGetSize(pArray); for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGet(pArray, i); + SSdbRaw *pRaw = taosArrayGetP(pArray, i); int32_t code = sdbWrite(pSdb, pRaw); if (code != 0) { - mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); + mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); return code; } } - mDebug("trn:%d, delete from sdb", pTrans->id); return 0; } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans) { - assert(true); - SArray *pArray = pTrans->redoLogs; + mTrace("trans:%d, perform update action, stage:%d", pTrans->id, pTrans->stage); + + SArray *pArray = pDstTrans->commitLogs; int32_t arraySize = taosArrayGetSize(pArray); for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGet(pArray, i); + SSdbRaw *pRaw = taosArrayGetP(pArray, i); int32_t code = sdbWrite(pSdb, pRaw); if (code != 0) { - mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); + mError("trans:%d, failed to write raw:%p to sdb since %s", pDstTrans->id, pRaw, terrstr()); return code; } } - pTrans->stage = pDstTrans->stage; - mDebug("trn:%d, update in sdb", pTrans->id); + pDstTrans->stage = pTrans->stage; return 0; } @@ -224,6 +276,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->id = trnGenerateTransId(); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; + pTrans->pMnode = pMnode; pTrans->rpcHandle = rpcHandle; pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); @@ -238,13 +291,13 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return NULL; } - mDebug("trn:%d, is created, %p", pTrans->id, pTrans); + mDebug("trans:%d, data:%p is created", pTrans->id, pTrans); return pTrans; } static void trnDropArray(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { - SSdbRaw *pRaw = taosArrayGet(pArray, i); + SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); } @@ -258,13 +311,13 @@ void mndTransDrop(STrans *pTrans) { trnDropArray(pTrans->redoActions); trnDropArray(pTrans->undoActions); - mDebug("trn:%d, is dropped, %p", pTrans->id, pTrans); + mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); } void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; - mTrace("trn:%d, set rpc handle:%p", pTrans->id, rpcHandle); + mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); } static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { @@ -273,7 +326,7 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { return -1; } - void *ptr = taosArrayPush(pArray, pRaw); + void *ptr = taosArrayPush(pArray, &pRaw); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -284,92 +337,92 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw); - mTrace("trn:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw); - mTrace("trn:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw); - mTrace("trn:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg); - mTrace("trn:%d, msg:%p append to redo actions", pTrans->id, pMsg); + mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg); return code; } int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg); - mTrace("trn:%d, msg:%p append to undo actions", pTrans->id, pMsg); + mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg); return code; } -int32_t mndInitTrans(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_TRANS, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndTransActionEncode, - .decodeFp = (SdbDecodeFp)mndTransActionDecode, - .insertFp = (SdbInsertFp)mndTransActionInsert, - .updateFp = (SdbUpdateFp)mndTransActionUpdate, - .deleteFp = (SdbDeleteFp)mndTransActionDelete}; - - return sdbSetTable(pMnode->pSdb, table); -} - -void mndCleanupTrans(SMnode *pMnode) {} - -int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) { - if (syncfp == NULL) return -1; +int32_t mndTransPrepare(STrans *pTrans) { + mDebug("trans:%d, prepare transaction", pTrans->id); SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { - mError("trn:%d, failed to decode trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); return -1; } sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); - if (sdbWrite(pTrans->pMnode->pSdb, pRaw) != 0) { - mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr()); + if (sdbWriteNotFree(pTrans->pMnode->pSdb, pRaw) != 0) { + mError("trans:%d, failed to write trans since %s", pTrans->id, terrstr()); return -1; } - if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) { - mError("trn:%d, failed to sync trans since %s", pTrans->id, terrstr()); + STransMsg *pMsg = calloc(1, sizeof(STransMsg)); + pMsg->id = pTrans->id; + pMsg->rpcHandle = pTrans->rpcHandle; + + mDebug("trans:%d, start sync, RPC:%p pMsg:%p", pTrans->id, pTrans->rpcHandle, pMsg); + if (mndSyncPropose(pTrans->pMnode, pRaw, pMsg) != 0) { + mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); + free(pMsg); + sdbFreeRaw(pRaw); return -1; } + sdbFreeRaw(pRaw); return 0; } -static void trnSendRpcRsp(void *rpcHandle, int32_t code) { - if (rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno}; +static void trnSendRpcRsp(STransMsg *pMsg, int32_t code) { + mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p", pMsg->id, pMsg->rpcHandle, code & 0xFFFF, pMsg); + if (pMsg->rpcHandle != NULL) { + SRpcMsg rspMsg = {.handle = pMsg->rpcHandle, .code = code}; rpcSendResponse(&rspMsg); } -} -int32_t mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code) { - if (code != 0) { - trnSendRpcRsp(pData, terrno); - return 0; - } + free(pMsg); +} - if (sdbWrite(pMnode->pSdb, pData) != 0) { - code = terrno; - trnSendRpcRsp(pData, code); - terrno = code; - return -1; +void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) { + if (code == 0) { + mDebug("trans:%d, commit transaction", pMsg->id); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + if (sdbWrite(pMnode->pSdb, pRaw) != 0) { + code = terrno; + mError("trans:%d, failed to write sdb while commit since %s", pMsg->id, terrstr()); + } + trnSendRpcRsp(pMsg, code); + } else { + mDebug("trans:%d, rollback transaction", pMsg->id); + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + if (sdbWrite(pMnode->pSdb, pRaw) != 0) { + mError("trans:%d, failed to write sdb while rollback since %s", pMsg->id, terrstr()); + } + trnSendRpcRsp(pMsg, code); } - - return 0; } static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index a3dd2555778fccb768f08c24566a3ac41c179718..13baee4b54155ced1bd20b10b2be2c18a038e505 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -32,6 +32,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg); static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg); +static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); int32_t mndInitUser(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_USER, @@ -47,6 +50,9 @@ int32_t mndInitUser(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_USER, mndProcessAlterUserMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_USER, mndProcessDropUserMsg); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_USER, mndGetUserMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser); return sdbSetTable(pMnode->pSdb, table); } @@ -70,7 +76,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("user:%s, will be created while deploy sdb", userObj.user); + mDebug("user:%s, will be created while deploy sdb", userObj.user); return sdbWrite(pMnode->pSdb, pRaw); } @@ -164,14 +170,14 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) { mTrace("user:%s, perform update action", pSrcUser->user); - memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN); - memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN); - memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); - pSrcUser->createdTime = pDstUser->createdTime; - pSrcUser->updateTime = pDstUser->updateTime; - pSrcUser->superAuth = pDstUser->superAuth; - pSrcUser->readAuth = pDstUser->readAuth; - pSrcUser->writeAuth = pDstUser->writeAuth; + memcpy(pDstUser->user, pSrcUser->user, TSDB_USER_LEN); + memcpy(pDstUser->pass, pSrcUser->pass, TSDB_KEY_LEN); + memcpy(pDstUser->acct, pSrcUser->acct, TSDB_USER_LEN); + pDstUser->createdTime = pSrcUser->createdTime; + pDstUser->updateTime = pSrcUser->updateTime; + pDstUser->superAuth = pSrcUser->superAuth; + pDstUser->readAuth = pSrcUser->readAuth; + pDstUser->writeAuth = pSrcUser->writeAuth; return 0; } @@ -197,11 +203,15 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, userObj.writeAuth = 1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); - if (pTrans == NULL) return -1; + if (pTrans == NULL) { + mError("user:%s, failed to create since %s", user, terrstr()); + return -1; + } + mDebug("trans:%d, used to create user:%s", pTrans->id, user); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("failed to append redo log since %s", terrstr()); + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } @@ -209,7 +219,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj); if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("failed to append undo log since %s", terrstr()); + mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } @@ -217,13 +227,14 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("failed to append commit log since %s", terrstr()); + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans, mndSyncPropose) != 0) { + if (mndTransPrepare(pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } @@ -236,6 +247,8 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; + mDebug("user:%s, start to create", pCreate->user); + if (pCreate->user[0] == 0) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; mError("user:%s, failed to create since %s", pCreate->user, terrstr()); @@ -284,4 +297,102 @@ static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process drop user msg since %s", terrstr()); return -1; +} + +static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "privilege"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "account"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + strcpy(pMeta->tableFname, "show users"); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_USER); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + return 0; +} + +static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SUserObj *pUser = NULL; + int32_t cols = 0; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_USER, pShow->pIter, (void **)&pUser); + if (pShow->pIter == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->user, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pUser->superAuth) { + const char *src = "super"; + STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); + } else if (pUser->writeAuth) { + const char *src = "writable"; + STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); + } else { + const char *src = "readable"; + STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); + } + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pUser->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->acct, pShow->bytes[cols]); + cols++; + + numOfRows++; + sdbRelease(pSdb, pUser); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 2fa0838ee802b788d0172662a208651c1db931e9..9ffe8968aef6f18ef2b0be6a2796200ab7ab4eb8 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -333,7 +333,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed to create msg since %s", terrstr()); + mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr()); return NULL; } @@ -341,7 +341,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { mndCleanupMsg(pMsg); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - mError("failed to create msg since %s", terrstr()); + mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr()); return NULL; } memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); @@ -350,13 +350,13 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { pMsg->rpcMsg = *pRpcMsg; pMsg->createdTime = taosGetTimestampSec(); - mTrace("msg:%p, is created", pMsg); + mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle); return pMsg; } void mndCleanupMsg(SMnodeMsg *pMsg) { + mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle); taosFreeQitem(pMsg); - mTrace("msg:%p, is destroyed", pMsg); } void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { @@ -371,7 +371,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType % 2 == 1); - mTrace("msg:%p, app:%p will be processed", pMsg, ahandle); + mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]); if (isReq && !mndIsMaster(pMnode)) { code = TSDB_CODE_APP_NOT_READY; @@ -393,7 +393,10 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { } code = (*fp)(pMsg); - if (code != 0) { + if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mTrace("msg:%p, app:%p in progressing", pMsg, ahandle); + return; + } else if (code != 0) { code = terrno; mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index e492f28557aafd2b5141053a45953055f29a6a37..a160533bf23b9141d9ab73d640f723f4369515a2 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -71,7 +71,6 @@ typedef struct SSdb { } SSdb; int32_t sdbWriteFile(SSdb *pSdb); -int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw); #ifdef __cplusplus } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 68cb7be68b249d34a26a854b46635f8d6cb5d0fb..1d4888c2eb37a418e66b994a63510d7f9ba79a72 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -94,7 +94,7 @@ void sdbCleanup(SSdb *pSdb) { taosHashClear(hash); taosHashCleanup(hash); pSdb->hashObjs[i] = NULL; - mTrace("sdb table:%d is cleaned up", i); + mDebug("sdb table:%d is cleaned up", i); } free(pSdb); @@ -129,7 +129,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->hashObjs[sdbType] = hash; taosInitRWLatch(&pSdb->locks[sdbType]); - mTrace("sdb table:%d is initialized", sdbType); + mDebug("sdb table:%d is initialized", sdbType); return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 6f88f08b2c3b3741d289a5d47c94b650bd5bf9fb..af37e9e1d5a60f8a609304ddcc956e30f9ef7d31 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -118,7 +118,7 @@ int32_t sdbReadFile(SSdb *pSdb) { break; } - code = sdbWriteRaw(pSdb, pRaw); + code = sdbWriteNotFree(pSdb, pRaw); if (code != 0) { mError("failed to read file:%s since %s", file, terrstr()); goto PARSE_SDB_DATA_ERROR; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index bdca5eaa987a6498b4d36229e5929984980b33f2..53fcc3f5b0bae6877c8d3f492f568d61f1bf8326 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -98,7 +98,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } SSdbRow *pDstRow = *ppDstRow; - pRow->status = pRaw->status; + pDstRow->status = pRaw->status; taosRUnLockLatch(pLock); SdbUpdateFp updateFp = pSdb->updateFps[pRow->type]; @@ -138,7 +138,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * return code; } -int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { +int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { SHashObj *hash = sdbGetHash(pSdb, pRaw->type); if (hash == NULL) return terrno; @@ -158,7 +158,6 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); break; case SDB_STATUS_READY: - case SDB_STATUS_DROPPING: code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); break; case SDB_STATUS_DROPPED: @@ -170,7 +169,7 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { } int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { - int32_t code = sdbWriteRaw(pSdb, pRaw); + int32_t code = sdbWriteNotFree(pSdb, pRaw); sdbFreeRaw(pRaw); return code; } @@ -201,7 +200,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { case SDB_STATUS_CREATING: terrno = TSDB_CODE_SDB_OBJ_CREATING; break; - case SDB_STATUS_DROPPING: + case SDB_STATUS_DROPPED: terrno = TSDB_CODE_SDB_OBJ_DROPPING; break; default: diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 7ed1a427f5cc0e767e622343acc8bf023d4d92ac..e37559808e2b5242354be6187c07fc49a732a940 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -26,10 +26,15 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { pRaw->type = type; pRaw->sver = sver; pRaw->dataLen = dataLen; + + mTrace("raw:%p, is created, len:%d", pRaw, dataLen); return pRaw; } -void sdbFreeRaw(SSdbRaw *pRaw) { free(pRaw); } +void sdbFreeRaw(SSdbRaw *pRaw) { + mTrace("raw:%p, is freed", pRaw); + free(pRaw); +} int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { if (pRaw == NULL) {