From a7e1c7cec6cde547734d4181c38ad1b8bd6794e7 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 14 Apr 2020 23:33:22 +0800 Subject: [PATCH] [TD-52] refactor sdb codes --- src/inc/mnode.h | 5 ++-- src/inc/mpeer.h | 19 ++++++------- src/inc/tbalance.h | 1 + src/mnode/inc/mgmtSdb.h | 7 ++--- src/mnode/src/mgmtDb.c | 4 +-- src/mnode/src/mgmtMnode.c | 54 ++++++++++++------------------------- src/mnode/src/mgmtSdb.c | 37 +++++++++---------------- src/mnode/src/mgmtShell.c | 21 +-------------- src/mnode/src/mgmtTable.c | 8 +++--- src/mnode/src/mgmtUser.c | 4 +-- src/mnode/src/mgmtVgroup.c | 4 +-- tests/script/tmp/dnode2.sim | 6 ----- tests/script/tmp/mnodes.sim | 7 +++++ 13 files changed, 62 insertions(+), 115 deletions(-) delete mode 100644 tests/script/tmp/dnode2.sim create mode 100644 tests/script/tmp/mnodes.sim diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 00b7519258..e8a0ba3bcc 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -49,11 +49,10 @@ typedef struct _mnode_obj { int8_t reserved[14]; int8_t updateEnd[1]; int32_t refCount; - int8_t role; - int8_t status; - uint16_t port; uint32_t privateIp; uint32_t publicIp; + uint16_t port; + int8_t role; char mnodeName[TSDB_NODE_NAME_LEN + 1]; } SMnodeObj; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index e5051b39eb..e7abf09321 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -28,27 +28,28 @@ enum _TAOS_MN_STATUS { TAOS_MN_STATUS_READY }; +// general implementation int32_t mpeerInit(); void mpeerCleanup(); + +// special implementation +int32_t mpeerInitMnodes(); +void mpeerCleanupMnodes(); +int32_t mpeerAddMnode(int32_t dnodeId); +int32_t mpeerRemoveMnode(int32_t dnodeId); + +void * mpeerGetMnode(int32_t mnodeId); int32_t mpeerGetMnodesNum(); void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); void mpeerReleaseMnode(struct _mnode_obj *pMnode); -bool mpeerInServerStatus(); bool mpeerIsMaster(); -bool mpeerCheckRedirect(); void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mpeerGetMpeerInfos(void *mpeers); -char * mpeerGetMnodeStatusStr(int32_t status); -char * mpeerGetMnodeRoleStr(int32_t role); - -int32_t mpeerAddMnode(int32_t dnodeId); -int32_t mpeerRemoveMnode(int32_t dnodeId); - -int32_t sdbForwardDbReqToPeer(void *pHead); +int32_t mpeerForwardReqToPeer(void *pHead); #ifdef __cplusplus } diff --git a/src/inc/tbalance.h b/src/inc/tbalance.h index 8cf8cb9fb9..c73d6a91a9 100644 --- a/src/inc/tbalance.h +++ b/src/inc/tbalance.h @@ -31,6 +31,7 @@ struct _dnode_obj; int32_t balanceInit(); void balanceCleanUp(); void balanceNotify(); +void balanceReset(); int32_t balanceAllocVnodes(struct _vg_obj *pVgroup); int32_t balanceDropDnode(struct _dnode_obj *pDnode); diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 27f9a51650..2804d40a71 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -64,25 +64,22 @@ typedef struct { int32_t (*encodeFp)(SSdbOperDesc *pOper); int32_t (*decodeFp)(SSdbOperDesc *pDesc); int32_t (*destroyFp)(SSdbOperDesc *pDesc); - int32_t (*updateAllFp)(); + int32_t (*restoredFp)(); } SSdbTableDesc; typedef struct { - int32_t code; int64_t version; - void * sync; void * wal; - sem_t sem; pthread_mutex_t mutex; } SSdbObject; int32_t sdbInit(); void sdbCleanUp(); SSdbObject *sdbGetObj(); -int sdbProcessWrite(void *param, void *data, int type); void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); +int sdbProcessWrite(void *param, void *data, int type); int32_t sdbInsertRow(SSdbOperDesc *pOper); int32_t sdbDeleteRow(SSdbOperDesc *pOper); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 946ec29d8c..089bf494e7 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -102,7 +102,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionUpdateAll() { +static int32_t mgmtDbActionRestored() { return 0; } @@ -123,7 +123,7 @@ int32_t mgmtInitDbs() { .encodeFp = mgmtDbActionEncode, .decodeFp = mgmtDbActionDecode, .destroyFp = mgmtDbActionDestroy, - .updateAllFp = mgmtDbActionUpdateAll + .restoredFp = mgmtDbActionRestored }; tsDbSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index faa66d1fd9..ca18d6bdba 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -19,12 +19,9 @@ #include "trpc.h" #include "tsync.h" #include "mpeer.h" -#include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" -extern int32_t mpeerInitMnodes(); -extern void mpeerCleanupMnodes(); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -34,18 +31,24 @@ static SMnodeObj tsMnodeObj = {0}; int32_t mpeerInitMnodes() { tsMnodeObj.mnodeId = 1; - tsMnodeObj.dnodeId = 1; tsMnodeObj.privateIp = inet_addr(tsPrivateIp); tsMnodeObj.publicIp = inet_addr(tsPublicIp); tsMnodeObj.createdTime = taosGetTimestampMs(); tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; - tsMnodeObj.status = TAOS_MN_STATUS_READY; tsMnodeObj.port = tsMnodeDnodePort; sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId); return TSDB_CODE_SUCCESS; } +void mpeerCleanupMnodes() {} +int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } +int32_t mpeerGetMnodesNum() { return 1; } +void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} +bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } + void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { if (*pMnode == NULL) { *pMnode = &tsMnodeObj; @@ -58,20 +61,21 @@ void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.privateIp); } void mpeerGetPublicIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.publicIp); } void mpeerGetMpeerInfos(void *param) { SDMNodeInfos *mpeers = param; + mpeers->inUse = 0; mpeers->nodeNum = 1; mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId); mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp); @@ -79,12 +83,9 @@ void mpeerGetMpeerInfos(void *param) { strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); } -void mpeerCleanupMnodes() {} -int32_t mpeerGetMnodesNum() { return 1; } -void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} -bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } -bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } -bool mpeerCheckRedirect() { return false; } +int32_t mpeerForwardReqToPeer(void *pHead) { + return TSDB_CODE_SUCCESS; +} #endif @@ -98,20 +99,7 @@ void mpeerCleanup() { mpeerCleanupMnodes(); } -char *mpeerGetMnodeStatusStr(int32_t status) { - switch (status) { - case TAOS_MN_STATUS_OFFLINE: - return "offline"; - case TAOS_MN_STATUS_DROPPING: - return "dropping"; - case TAOS_MN_STATUS_READY: - return "ready"; - default: - return "undefined"; - } -} - -char *mpeerGetMnodeRoleStr(int32_t role) { +static char *mpeerGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; @@ -159,12 +147,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 10; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "status"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 10; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "role"); @@ -219,14 +201,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); cols++; numOfRows++; + + mpeerReleaseMnode(pMnode); } pShow->numOfReads += numOfRows; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index c47d9cd9d8..3038b0eadc 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -15,18 +15,13 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosdef.h" #include "taoserror.h" -#include "tchecksum.h" -#include "tglobalcfg.h" #include "tlog.h" #include "trpc.h" -#include "tutil.h" #include "twal.h" -#include "tsync.h" -#include "mpeer.h" #include "hashint.h" #include "hashstr.h" +#include "mpeer.h" #include "mgmtSdb.h" typedef struct _SSdbTable { @@ -39,13 +34,13 @@ typedef struct _SSdbTable { int32_t autoIndex; int64_t numOfRows; void * iHandle; - int32_t (*insertFp)(SSdbOperDesc *pDesc); - int32_t (*deleteFp)(SSdbOperDesc *pOper); - int32_t (*updateFp)(SSdbOperDesc *pOper); - int32_t (*decodeFp)(SSdbOperDesc *pOper); - int32_t (*encodeFp)(SSdbOperDesc *pOper); - int32_t (*destroyFp)(SSdbOperDesc *pOper); - int32_t (*updateAllFp)(); + int32_t (*insertFp)(SSdbOperDesc *pDesc); + int32_t (*deleteFp)(SSdbOperDesc *pOper); + int32_t (*updateFp)(SSdbOperDesc *pOper); + int32_t (*decodeFp)(SSdbOperDesc *pOper); + int32_t (*encodeFp)(SSdbOperDesc *pOper); + int32_t (*destroyFp)(SSdbOperDesc *pOper); + int32_t (*restoredFp)(); pthread_mutex_t mutex; } SSdbTable; @@ -105,15 +100,8 @@ static void *sdbGetTableFromId(int32_t tableId) { return tsSdbTableList[tableId]; } -#ifndef _MPEER -int32_t sdbForwardDbReqToPeer(void *pHead) { - return TSDB_CODE_SUCCESS; -} -#endif - int32_t sdbInit() { tsSdbObj = calloc(1, sizeof(SSdbObject)); - sem_init(&tsSdbObj->sem, 0, 0); pthread_mutex_init(&tsSdbObj->mutex, NULL); SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; @@ -131,8 +119,8 @@ int32_t sdbInit() { for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; - if (pTable->updateAllFp) { - (*pTable->updateAllFp)(); + if (pTable->restoredFp) { + (*pTable->restoredFp)(); } totalRows += pTable->numOfRows; @@ -146,7 +134,6 @@ int32_t sdbInit() { void sdbCleanUp() { if (tsSdbObj) { - sem_destroy(&tsSdbObj->sem); pthread_mutex_destroy(&tsSdbObj->mutex); walClose(tsSdbObj->wal); free(tsSdbObj); @@ -268,7 +255,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ tsSdbObj->version++; pHead->version = tsSdbObj->version; - code = sdbForwardDbReqToPeer(pHead); + code = mpeerForwardReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, @@ -523,7 +510,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->encodeFp = pDesc->encodeFp; pTable->decodeFp = pDesc->decodeFp; pTable->destroyFp = pDesc->destroyFp; - pTable->updateAllFp = pDesc->updateAllFp; + pTable->restoredFp = pDesc->restoredFp; if (sdbInitIndexFp[pTable->keyType] != NULL) { pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index dbd7627d3f..5010429db3 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -42,7 +42,6 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *sec static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); @@ -142,19 +141,13 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (mpeerCheckRedirect()) { + if (!mpeerIsMaster()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); return; } - if (!mpeerInServerStatus()) { - mgmtProcessMsgWhileNotReady(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - return; - } - if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); rpcFreeCont(rpcMsg->pCont); @@ -501,18 +494,6 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { rpcSendResponse(&rpcRsp); } -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) { - mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]); - SRpcMsg rpcRsp = { - .msgType = 0, - .pCont = 0, - .contLen = 0, - .code = TSDB_CODE_NOT_READY, - .handle = rpcMsg->handle - }; - rpcSendResponse(&rpcRsp); -} - void mgmtSendSimpleResp(void *thandle, int32_t code) { SRpcMsg rpcRsp = { .msgType = 0, diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index feed12f97e..1c384fdfdf 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -220,7 +220,7 @@ static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionUpdateAll() { +static int32_t mgmtChildTableActionRestored() { void *pNode = NULL; void *pLastNode = NULL; SChildTableObj *pTable = NULL; @@ -320,7 +320,7 @@ static int32_t mgmtInitChildTables() { .encodeFp = mgmtChildTableActionEncode, .decodeFp = mgmtChildTableActionDecode, .destroyFp = mgmtChildTableActionDestroy, - .updateAllFp = mgmtChildTableActionUpdateAll + .restoredFp = mgmtChildTableActionRestored }; tsChildTableSdb = sdbOpenTable(&tableDesc); @@ -414,7 +414,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionUpdateAll() { +static int32_t mgmtSuperTableActionRestored() { return 0; } @@ -435,7 +435,7 @@ static int32_t mgmtInitSuperTables() { .encodeFp = mgmtSuperTableActionEncode, .decodeFp = mgmtSuperTableActionDecode, .destroyFp = mgmtSuperTableActionDestroy, - .updateAllFp = mgmtSuperTableActionUpdateAll + .restoredFp = mgmtSuperTableActionRestored }; tsSuperTableSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 3a49e56331..ef01faf6ba 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -84,7 +84,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionUpdateAll() { +static int32_t mgmtUserActionRestored() { SAcctObj *pAcct = acctGetAcct("root"); mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "monitor", tsInternalPass); @@ -111,7 +111,7 @@ int32_t mgmtInitUsers() { .encodeFp = mgmtUserActionEncode, .decodeFp = mgmtUserActionDecode, .destroyFp = mgmtUserActionDestroy, - .updateAllFp = mgmtUserActionUpdateAll + .restoredFp = mgmtUserActionRestored }; tsUserSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 19468dc547..cc8dba52dd 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -152,7 +152,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionUpdateAll() { +static int32_t mgmtVgroupActionRestored() { return 0; } @@ -173,7 +173,7 @@ int32_t mgmtInitVgroups() { .encodeFp = mgmtVgroupActionEncode, .decodeFp = mgmtVgroupActionDecode, .destroyFp = mgmtVgroupActionDestroy, - .updateAllFp = mgmtVgroupActionUpdateAll, + .restoredFp = mgmtVgroupActionRestored, }; tsVgroupSdb = sdbOpenTable(&tableDesc); diff --git a/tests/script/tmp/dnode2.sim b/tests/script/tmp/dnode2.sim deleted file mode 100644 index 6d9a844fb6..0000000000 --- a/tests/script/tmp/dnode2.sim +++ /dev/null @@ -1,6 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 -system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 -system sh/exec_up.sh -n dnode1 -s start -system sh/exec_up.sh -n dnode2 -s start -sql connect \ No newline at end of file diff --git a/tests/script/tmp/mnodes.sim b/tests/script/tmp/mnodes.sim new file mode 100644 index 0000000000..32e72f16ff --- /dev/null +++ b/tests/script/tmp/mnodes.sim @@ -0,0 +1,7 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 +system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3 +system sh/cfg.sh -n dnode1 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3 -- GitLab