From a09dac94395616b8d00ee75d2c26b97983172ecf Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 18 Apr 2020 22:04:18 +0800 Subject: [PATCH] [TD-52] refactor sdb write codes --- src/dnode/CMakeLists.txt | 2 +- src/dnode/src/dnodeMClient.c | 7 +- src/inc/mnode.h | 1 + src/inc/{treplica.h => tbalance.h} | 17 +- src/mnode/inc/mgmtMnode.h | 5 +- src/mnode/inc/mgmtSdb.h | 49 +- src/mnode/src/mgmtAcct.c | 16 +- .../src/{mgmtReplica.c => mgmtBalance.c} | 13 +- src/mnode/src/mgmtDServer.c | 2 +- src/mnode/src/mgmtDb.c | 22 +- src/mnode/src/mgmtDnode.c | 26 +- src/mnode/src/mgmtMain.c | 14 +- src/mnode/src/mgmtMnode.c | 28 +- src/mnode/src/mgmtSdb.c | 497 +++++++++++------- src/mnode/src/mgmtShell.c | 2 +- src/mnode/src/mgmtTable.c | 68 +-- src/mnode/src/mgmtUser.c | 22 +- src/mnode/src/mgmtVgroup.c | 30 +- src/vnode/src/vnodeMain.c | 3 + 19 files changed, 471 insertions(+), 353 deletions(-) rename src/inc/{treplica.h => tbalance.h} (71%) rename src/mnode/src/{mgmtReplica.c => mgmtBalance.c} (86%) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 5735e1a8c1..af2dc2d777 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -27,7 +27,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ENDIF () IF (TD_SYNC) - TARGET_LINK_LIBRARIES(taosd replica sync) + TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 90a093560f..38be318c25 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -23,12 +23,13 @@ #include "tsync.h" #include "ttime.h" #include "ttimer.h" -#include "treplica.h" +#include "tbalance.h" +#include "vnode.h" +#include "mnode.h" #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" #include "dnodeMgmt.h" -#include "vnode.h" #define MPEER_CONTENT_LEN 2000 @@ -181,7 +182,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { tsMnodeInfos.nodeInfos[i].nodeName); } dnodeSaveMnodeIpList(); - replicaNotify(); + sdbUpdateSync(); } taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index e7ad88d6b6..35f7650c20 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -24,6 +24,7 @@ int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); void mgmtStopSystem(); +void sdbUpdateSync(); #ifdef __cplusplus } diff --git a/src/inc/treplica.h b/src/inc/tbalance.h similarity index 71% rename from src/inc/treplica.h rename to src/inc/tbalance.h index 3abed1c4aa..9ffa6332c6 100644 --- a/src/inc/treplica.h +++ b/src/inc/tbalance.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_REPLICA_H -#define TDENGINE_REPLICA_H +#ifndef TDENGINE_BALANCE_H +#define TDENGINE_BALANCE_H #ifdef __cplusplus extern "C" { @@ -23,13 +23,12 @@ extern "C" { struct SVgObj; struct SDnodeObj; -int32_t replicaInit(); -void replicaCleanUp(); -void replicaNotify(); -void replicaReset(); -int32_t replicaAllocVnodes(struct SVgObj *pVgroup); -int32_t replicaForwardReqToPeer(void *pHead); -int32_t replicaDropDnode(struct SDnodeObj *pDnode); +int32_t balanceInit(); +void balanceCleanUp(); +void balanceNotify(); +void balanceReset(); +int32_t balanceAllocVnodes(struct SVgObj *pVgroup); +int32_t balanceDropDnode(struct SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index 1faa616ceb..34b5d19cf2 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -39,10 +39,9 @@ int32_t mgmtGetMnodesNum(); void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode); void mgmtReleaseMnode(struct SMnodeObj *pMnode); -bool mgmtIsMaster(); - +char * mgmtGetMnodeRoleStr(); void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp); -void mgmtGetMnodeList(void *mpeers); +void mgmtGetMnodeList(void *mnodes); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 8ecb5ef152..c09d215adb 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -36,54 +36,47 @@ typedef enum { SDB_KEY_STRING, SDB_KEY_INT, SDB_KEY_AUTO -} ESdbKeyType; +} ESdbKey; typedef enum { SDB_OPER_GLOBAL, SDB_OPER_LOCAL -} ESdbOperType; +} ESdbOper; typedef struct { - ESdbOperType type; - void * table; - void * pObj; - int32_t rowSize; - void * rowData; -} SSdbOperDesc; + ESdbOper type; + void * table; + void * pObj; + int32_t rowSize; + void * rowData; +} SSdbOper; typedef struct { char *tableName; int32_t hashSessions; int32_t maxRowSize; int32_t refCountPos; - ESdbTable tableId; - ESdbKeyType keyType; - int32_t (*insertFp)(SSdbOperDesc *pOper); - int32_t (*deleteFp)(SSdbOperDesc *pOper); - int32_t (*updateFp)(SSdbOperDesc *pOper); - int32_t (*encodeFp)(SSdbOperDesc *pOper); - int32_t (*decodeFp)(SSdbOperDesc *pDesc); - int32_t (*destroyFp)(SSdbOperDesc *pDesc); + ESdbTable tableId; + ESdbKey keyType; + int32_t (*insertFp)(SSdbOper *pOper); + int32_t (*deleteFp)(SSdbOper *pOper); + int32_t (*updateFp)(SSdbOper *pOper); + int32_t (*encodeFp)(SSdbOper *pOper); + int32_t (*decodeFp)(SSdbOper *pDesc); + int32_t (*destroyFp)(SSdbOper *pDesc); int32_t (*restoredFp)(); } SSdbTableDesc; -typedef struct { - int64_t version; - void * wal; - pthread_mutex_t mutex; -} SSdbObject; - int32_t sdbInit(); void sdbCleanUp(); -SSdbObject *sdbGetObj(); - void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); -int sdbProcessWrite(void *param, void *data, int type); +bool sdbIsMaster(); +void sdbUpdateMnodeRoles(); -int32_t sdbInsertRow(SSdbOperDesc *pOper); -int32_t sdbDeleteRow(SSdbOperDesc *pOper); -int32_t sdbUpdateRow(SSdbOperDesc *pOper); +int32_t sdbInsertRow(SSdbOper *pOper); +int32_t sdbDeleteRow(SSdbOper *pOper); +int32_t sdbUpdateRow(SSdbOper *pOper); void *sdbGetRow(void *handle, void *key); void *sdbFetchRow(void *handle, void *pNode, void **ppRow); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 3a52715274..3e04399fe7 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -30,28 +30,28 @@ void * tsAcctSdb = NULL; int32_t tsAcctUpdateSize; static void mgmtCreateRootAcct(); -static int32_t mgmtActionAcctDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtActionAcctDestroy(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; pthread_mutex_destroy(&pAcct->mutex); tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtAcctActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtAcctActionInsert(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo)); pthread_mutex_init(&pAcct->mutex, NULL); return TSDB_CODE_SUCCESS; } -static int32_t mgmtActionAcctDelete(SSdbOperDesc *pOper) { +static int32_t mgmtActionAcctDelete(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; mgmtDropAllUsers(pAcct); mgmtDropAllDbs(pAcct); return TSDB_CODE_SUCCESS; } -static int32_t mgmtActionAcctUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtActionAcctUpdate(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; SAcctObj *pSaved = mgmtGetAcct(pAcct->user); if (pAcct != pSaved) { @@ -61,14 +61,14 @@ static int32_t mgmtActionAcctUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtActionActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtActionActionEncode(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; memcpy(pOper->rowData, pAcct, tsAcctUpdateSize); pOper->rowSize = tsAcctUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtActionAcctDecode(SSdbOperDesc *pOper) { +static int32_t mgmtActionAcctDecode(SSdbOper *pOper) { SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj)); if (pAcct == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -110,7 +110,7 @@ int32_t mgmtInitAccts() { return -1; } - mTrace("account table is created"); + mTrace("table:accounts table is created"); return acctInit(); } @@ -179,7 +179,7 @@ static void mgmtCreateRootAcct() { pAcct->acctId = sdbGetId(tsAcctSdb); pAcct->createdTime = taosGetTimestampMs(); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsAcctSdb, .pObj = pAcct, diff --git a/src/mnode/src/mgmtReplica.c b/src/mnode/src/mgmtBalance.c similarity index 86% rename from src/mnode/src/mgmtReplica.c rename to src/mnode/src/mgmtBalance.c index 05a303a69b..8ca651be2c 100644 --- a/src/mnode/src/mgmtReplica.c +++ b/src/mnode/src/mgmtBalance.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "trpc.h" -#include "treplica.h" +#include "tbalance.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtMnode.h" @@ -25,13 +25,12 @@ #ifndef _SYNC -int32_t replicaInit() { return TSDB_CODE_SUCCESS; } -void replicaCleanUp() {} -void replicaNotify() {} -void replicaReset() {} -int32_t replicaForwardReqToPeer(void *pHead) { return TSDB_CODE_SUCCESS; } +int32_t balanceInit() { return TSDB_CODE_SUCCESS; } +void balanceCleanUp() {} +void balanceNotify() {} +void balanceReset() {} -int32_t replicaAllocVnodes(SVgObj *pVgroup) { +int32_t balanceAllocVnodes(SVgObj *pVgroup) { void * pNode = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 4d8163dece..7d5b872f4a 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -21,7 +21,7 @@ #include "tsystem.h" #include "tutil.h" #include "tgrant.h" -#include "treplica.h" +#include "tbalance.h" #include "tglobalcfg.h" #include "dnode.h" #include "mgmtDef.h" diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 9bf05ecfc4..90aaa03e9a 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -46,12 +46,12 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); -static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionInsert(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); @@ -72,7 +72,7 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionDelete(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); @@ -84,7 +84,7 @@ static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionUpdate(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; SDbObj *pSaved = mgmtGetDb(pDb->name); if (pDb != pSaved) { @@ -94,14 +94,14 @@ static int32_t mgmtDbActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionEncode(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; memcpy(pOper->rowData, pDb, tsDbUpdateSize); pOper->rowSize = tsDbUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtDbActionDecode(SSdbOper *pOper) { SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); if (pDb == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -146,7 +146,7 @@ int32_t mgmtInitDbs() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs); - mTrace("db data is initialized"); + mTrace("table:dbs table is created"); return 0; } @@ -318,7 +318,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { pDb->createdTime = taosGetTimestampMs(); pDb->cfg = *pCreate; - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb, @@ -671,7 +671,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { if (pDb->status) return TSDB_CODE_SUCCESS; pDb->status = true; - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb, @@ -756,7 +756,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb, @@ -814,7 +814,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) { SDbObj *pDb = pMsg->pDb; mPrint("db:%s, drop db from sdb", pDb->name); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8b5969cfd0..baec309424 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tgrant.h" -#include "treplica.h" +#include "tbalance.h" #include "tglobalcfg.h" #include "ttime.h" #include "tutil.h" @@ -52,12 +52,12 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mgmtDnodeActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtDnodeActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; if (pDnode->status != TAOS_DN_STATUS_DROPPING) { pDnode->status = TAOS_DN_STATUS_OFFLINE; @@ -72,7 +72,7 @@ static int32_t mgmtDnodeActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; void * pNode = NULL; void * pLastNode = NULL; @@ -85,7 +85,7 @@ static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) { if (pVgroup == NULL) break; if (pVgroup->vnodeGid[0].dnodeId == pDnode->dnodeId) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsVgroupSdb, .pObj = pVgroup, @@ -101,7 +101,7 @@ static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDnodeActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionUpdate(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pSaved = mgmtGetDnode(pDnode->dnodeId); if (pDnode != pSaved) { @@ -111,14 +111,14 @@ static int32_t mgmtDnodeActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDnodeActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionEncode(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize); pOper->rowSize = tsDnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtDnodeActionDecode(SSdbOper *pOper) { SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); if (pDnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -180,7 +180,7 @@ int32_t mgmtInitDnodes() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes); - mTrace("dnodes table is created"); + mTrace("table:dnodes table is created"); return 0; } @@ -221,7 +221,7 @@ void mgmtReleaseDnode(SDnodeObj *pDnode) { } void mgmtUpdateDnode(SDnodeObj *pDnode) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, .pObj = pDnode, @@ -340,7 +340,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; - replicaNotify(); + balanceNotify(); } mgmtReleaseDnode(pDnode); @@ -393,7 +393,7 @@ static int32_t mgmtCreateDnode(uint32_t ip) { pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM; sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, .pObj = pDnode, @@ -413,7 +413,7 @@ static int32_t mgmtCreateDnode(uint32_t ip) { } int32_t mgmtDropDnode(SDnodeObj *pDnode) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, .pObj = pDnode diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 38f18b462a..1d2f62a593 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -17,7 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tsched.h" -#include "treplica.h" +#include "tbalance.h" #include "tgrant.h" #include "ttimer.h" #include "dnode.h" @@ -62,7 +62,7 @@ int32_t mgmtStartSystem() { } if (grantInit() < 0) { - mError("failed to init grants"); + mError("failed to init grant"); return -1; } @@ -92,7 +92,7 @@ int32_t mgmtStartSystem() { } if (mgmtInitMnodes() < 0) { - mError("failed to init mpeers"); + mError("failed to init mnodes"); return -1; } @@ -101,8 +101,8 @@ int32_t mgmtStartSystem() { return -1; } - if (replicaInit() < 0) { - mError("failed to init replica") + if (balanceInit() < 0) { + mError("failed to init balance") } if (mgmtInitDClient() < 0) { @@ -144,7 +144,7 @@ void mgmtCleanUpSystem() { mPrint("starting to clean up mgmt"); grantCleanUp(); mgmtCleanupMnodes(); - replicaCleanUp(); + balanceCleanUp(); mgmtCleanUpShell(); mgmtCleanupDClient(); mgmtCleanupDServer(); @@ -161,7 +161,7 @@ void mgmtCleanUpSystem() { } void mgmtStopSystem() { - if (mgmtIsMaster()) { + if (sdbIsMaster()) { mTrace("it is a master mgmt node, it could not be stopped"); return; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 8087ce5ad1..bfb6480aec 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,7 +18,7 @@ #include "taoserror.h" #include "trpc.h" #include "tsync.h" -#include "treplica.h" +#include "tbalance.h" #include "tutil.h" #include "ttime.h" #include "tsocket.h" @@ -30,18 +30,17 @@ #include "mgmtShell.h" #include "mgmtUser.h" -int32_t tsMnodeIsMaster = true; static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mgmtMnodeActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionInsert(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; @@ -53,7 +52,7 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionDelete(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); @@ -65,7 +64,7 @@ static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtMnodeActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionUpdate(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pSaved = mgmtGetMnode(pMnode->mnodeId); if (pMnode != pSaved) { @@ -76,14 +75,14 @@ static int32_t mgmtMnodeActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtMnodeActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionEncode(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize); pOper->rowSize = tsMnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtMnodeActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtMnodeActionDecode(SSdbOper *pOper) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); if (pMnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -133,7 +132,7 @@ int32_t mgmtInitMnodes() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); - mTrace("mnodes table is created"); + mTrace("table:mnodes table is created"); return TSDB_CODE_SUCCESS; } @@ -157,7 +156,7 @@ void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { return sdbFetchRow(tsMnodeSdb, pNode, (void **)pMnode); } -static char *mgmtGetMnodeRoleStr(int32_t role) { +char *mgmtGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; @@ -172,8 +171,6 @@ static char *mgmtGetMnodeRoleStr(int32_t role) { } } -bool mgmtIsMaster() { return tsMnodeIsMaster; } - void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) { void *pNode = NULL; while (1) { @@ -213,10 +210,8 @@ void mgmtGetMnodeList(void *param) { mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp); mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort); strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName); - mPrint("node:%d role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role)); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { mnodes->inUse = index; - mPrint("node:%d inUse:%d", pMnode->mnodeId, mnodes->inUse); } index++; @@ -231,7 +226,7 @@ int32_t mgmtAddMnode(int32_t dnodeId) { pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, .pObj = pMnode, @@ -252,7 +247,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) { return TSDB_CODE_DNODE_NOT_EXIST; } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, .pObj = pMnode @@ -268,6 +263,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) { } static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + sdbUpdateMnodeRoles(); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); if (pUser == NULL) return 0; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 4bc18d6a0d..3344b2b759 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -18,65 +18,93 @@ #include "taoserror.h" #include "tlog.h" #include "trpc.h" -#include "treplica.h" +#include "tutil.h" +#include "tbalance.h" #include "tqueue.h" #include "twal.h" +#include "tsync.h" #include "hashint.h" #include "hashstr.h" +#include "dnode.h" +#include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtMnode.h" #include "mgmtSdb.h" +typedef enum { + SDB_ACTION_INSERT, + SDB_ACTION_DELETE, + SDB_ACTION_UPDATE +} ESdbAction; + +typedef enum { + SDB_STATUS_OFFLINE, + SDB_STATUS_SERVING, + SDB_ACTION_CLOSING +} ESdbStatus; + typedef struct _SSdbTable { - char tableName[TSDB_DB_NAME_LEN + 1]; - ESdbTable tableId; - ESdbKeyType keyType; - int32_t hashSessions; - int32_t maxRowSize; - int32_t refCountPos; - int32_t autoIndex; - int64_t numOfRows; - void * iHandle; - int32_t (*insertFp)(SSdbOperDesc *pDesc); - int32_t (*deleteFp)(SSdbOperDesc *pOper); - int32_t (*updateFp)(SSdbOperDesc *pOper); - int32_t (*decodeFp)(SSdbOperDesc *pOper); - int32_t (*encodeFp)(SSdbOperDesc *pOper); - int32_t (*destroyFp)(SSdbOperDesc *pOper); - int32_t (*restoredFp)(); + char tableName[TSDB_DB_NAME_LEN + 1]; + ESdbTable tableId; + ESdbKey keyType; + int32_t hashSessions; + int32_t maxRowSize; + int32_t refCountPos; + int32_t autoIndex; + int64_t numOfRows; + void * iHandle; + int32_t (*insertFp)(SSdbOper *pDesc); + int32_t (*deleteFp)(SSdbOper *pOper); + int32_t (*updateFp)(SSdbOper *pOper); + int32_t (*decodeFp)(SSdbOper *pOper); + int32_t (*encodeFp)(SSdbOper *pOper); + int32_t (*destroyFp)(SSdbOper *pOper); + int32_t (*restoredFp)(); pthread_mutex_t mutex; } SSdbTable; +typedef struct { + ESyncRole role; + ESdbStatus status; + int64_t version; + void * sync; + void * wal; + SSyncCfg cfg; + sem_t sem; + int32_t code; + int32_t numOfTables; + SSdbTable *tableList[SDB_TABLE_MAX]; + pthread_mutex_t mutex; +} SSdbObject; + typedef struct { int32_t rowSize; void * row; -} SRowMeta; - -typedef enum { - SDB_ACTION_INSERT, - SDB_ACTION_DELETE, - SDB_ACTION_UPDATE -} ESdbActionType; - -static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0}; -static int32_t tsSdbNumOfTables = 0; -static SSdbObject * tsSdbObj; +} SSdbRow; +static SSdbObject tsSdbObj = {0}; static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash}; static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash}; static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData}; static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; +static int sdbWrite(void *param, void *data, int type); -int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } -int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } +int32_t sdbGetId(void *handle) { + return ((SSdbTable *)handle)->autoIndex; +} + +int64_t sdbGetNumOfRows(void *handle) { + return ((SSdbTable *)handle)->numOfRows; +} uint64_t sdbGetVersion() { - if (tsSdbObj) - return tsSdbObj->version; - else - return 0; + return tsSdbObj.version; +} + +bool sdbIsMaster() { + return tsSdbObj.role == TAOS_SYNC_ROLE_MASTER; } static char *sdbGetActionStr(int32_t action) { @@ -106,26 +134,26 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { } static void *sdbGetTableFromId(int32_t tableId) { - return tsSdbTableList[tableId]; + return tsSdbObj.tableList[tableId]; } -int32_t sdbInit() { - tsSdbObj = calloc(1, sizeof(SSdbObject)); - pthread_mutex_init(&tsSdbObj->mutex, NULL); - +static int32_t sdbInitWal() { SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; - tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg); - if (tsSdbObj->wal == NULL) { - sdbError("failed to open sdb in %s", tsMnodeDir); + tsSdbObj.wal = walOpen(tsMnodeDir, &walCfg); + if (tsSdbObj.wal == NULL) { + sdbError("failed to open sdb wal in %s", tsMnodeDir); return -1; } - sdbTrace("open sdb file for read"); - walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite); + sdbTrace("open sdb wal for restore"); + walRestore(tsSdbObj.wal, &tsSdbObj, sdbWrite); + return 0; +} +static void sdbRestoreTables() { int32_t totalRows = 0; int32_t numOfTables = 0; - for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) { + for (int32_t tableId = 0; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; if (pTable->restoredFp) { @@ -134,23 +162,170 @@ int32_t sdbInit() { totalRows += pTable->numOfRows; numOfTables++; - sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows); + sdbTrace("table:%s, is restored, numOfRows:%d", pTable->tableName, pTable->numOfRows); + } + + sdbTrace("sdb is restored, version:%d totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables); +} + +void sdbUpdateMnodeRoles() { + if (tsSdbObj.sync == NULL) return; + + SNodesRole roles = {0}; + syncGetNodesRole(tsSdbObj.sync, &roles); + + mPrint("update mnodes:%d sync roles", tsSdbObj.cfg.replica); + for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) { + SMnodeObj *pMnode = mgmtGetMnode(roles.nodeId[i]); + if (pMnode != NULL) { + pMnode->role = roles.role[i]; + mPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role)); + mgmtReleaseMnode(pMnode); + } + } +} + +static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { + sdbUpdateMnodeRoles(); + return 0; +} + +static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { + strcpy(name, "wal0"); + return 0; +} + +static void sdbNotifyRole(void *ahandle, int8_t role) { + mPrint("mnode role changed from %s to %s", mgmtGetMnodeRoleStr(tsSdbObj.role), mgmtGetMnodeRoleStr(role)); + + if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) { + balanceReset(); + } + tsSdbObj.role = role; + + sdbUpdateMnodeRoles(); +} + +static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { + tsSdbObj.code = code; + sem_post(&tsSdbObj.sem); + mPrint("sdb forward request confirmed, result:%s", tstrerror(code)); +} + +static int32_t sdbForwardToPeer(void *pHead) { + if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; + + int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, NULL); + if (code > 0) { + sem_wait(&tsSdbObj.sem); + return tsSdbObj.code; + } + return code; +} + +void sdbUpdateSync() { + SSyncCfg syncCfg = {0}; + int32_t index = 0; + + SDMNodeInfos *mnodes = dnodeGetMnodeList(); + for (int32_t i = 0; i < mnodes->nodeNum; ++i) { + SDMNodeInfo *node = &mnodes->nodeInfos[i]; + syncCfg.nodeInfo[i].nodeId = node->nodeId; + syncCfg.nodeInfo[i].nodeIp = node->nodeIp; + strcpy(syncCfg.nodeInfo[i].name, node->nodeName); + index++; + } + + if (index == 0) { + void *pNode = NULL; + while (1) { + SMnodeObj *pMnode = NULL; + pNode = mgmtGetNextMnode(pNode, &pMnode); + if (pMnode == NULL) break; + + syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId; + syncCfg.nodeInfo[index].nodeIp = pMnode->pDnode->privateIp; + strcpy(syncCfg.nodeInfo[index].name, pMnode->pDnode->dnodeName); + index++; + + mgmtReleaseMnode(pMnode); + } + } + + syncCfg.replica = index; + syncCfg.arbitratorIp = syncCfg.nodeInfo[0].nodeIp; + if (syncCfg.replica == 1) { + syncCfg.quorum = 1; + } else { + syncCfg.quorum = 2; + } + + bool hasThisDnode = false; + for (int32_t i = 0; i < syncCfg.replica; ++i) { + if (syncCfg.nodeInfo[i].nodeId == dnodeGetDnodeId()) { + hasThisDnode = true; + break; + } + } + + if (!hasThisDnode) return; + if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return; + + mPrint("work as mnode, replica:%d arbitratorIp:%s", syncCfg.replica, taosIpStr(syncCfg.arbitratorIp)); + for (int32_t i = 0; i < syncCfg.replica; ++i) { + mPrint("mnode:%d, ip:%s name:%s", syncCfg.nodeInfo[i].nodeId, taosIpStr(syncCfg.nodeInfo[i].nodeIp), + syncCfg.nodeInfo[i].name); } - sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.version = sdbGetVersion(); + syncInfo.syncCfg = syncCfg; + sprintf(syncInfo.path, "%s/", tsMnodeDir); + syncInfo.ahandle = NULL; + syncInfo.getWalInfo = sdbGetWalInfo; + syncInfo.getFileInfo = sdbGetFileInfo; + syncInfo.writeToCache = sdbWrite; + syncInfo.confirmForward = sdbConfirmForward; + syncInfo.notifyRole = sdbNotifyRole; + tsSdbObj.cfg = syncCfg; + + if (tsSdbObj.sync) { + syncReconfig(tsSdbObj.sync, &syncCfg); + } else { + tsSdbObj.sync = syncStart(&syncInfo); + } +} + +int32_t sdbInit() { + pthread_mutex_init(&tsSdbObj.mutex, NULL); + sem_init(&tsSdbObj.sem, 0, 0); + + if (sdbInitWal() != 0) { + return -1; + } - replicaNotify(); + sdbRestoreTables(); + if (mgmtGetMnodesNum() == 1) { + tsSdbObj.role = TAOS_SYNC_ROLE_MASTER; + } + + sdbUpdateSync(); + + tsSdbObj.status = SDB_STATUS_SERVING; return TSDB_CODE_SUCCESS; } void sdbCleanUp() { - if (tsSdbObj) { - pthread_mutex_destroy(&tsSdbObj->mutex); - walClose(tsSdbObj->wal); - free(tsSdbObj); - tsSdbObj = NULL; - } + if (tsSdbObj.status != SDB_STATUS_SERVING) return; + + syncStop(tsSdbObj.sync); + free(tsSdbObj.sync); + walClose(tsSdbObj.wal); + sem_destroy(&tsSdbObj.sem); + pthread_mutex_destroy(&tsSdbObj.mutex); + memset(&tsSdbObj, 0, sizeof(tsSdbObj)); } void sdbIncRef(void *handle, void *pRow) { @@ -178,15 +353,15 @@ void sdbDecRef(void *handle, void *pRow) { if (refCount <= 0 && *updateEnd) { sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); - SSdbOperDesc oper = {.pObj = pRow}; + SSdbOper oper = {.pObj = pRow}; (*pTable->destroyFp)(&oper); } } } -static SRowMeta *sdbGetRowMeta(void *handle, void *key) { +static SSdbRow *sdbGetRowMeta(void *handle, void *key) { SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta; + SSdbRow * pMeta; if (handle == NULL) return NULL; @@ -197,7 +372,7 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) { void *sdbGetRow(void *handle, void *key) { SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta; + SSdbRow * pMeta; if (handle == NULL) return NULL; @@ -213,8 +388,8 @@ void *sdbGetRow(void *handle, void *key) { return pMeta->row; } -static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { - SRowMeta rowMeta; +static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { + SSdbRow rowMeta; rowMeta.rowSize = pOper->rowSize; rowMeta.row = pOper->pObj; @@ -229,20 +404,20 @@ static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), + sdbTrace("table:%s, insert record:%s to hash, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows); (*pTable->insertFp)(pOper); return TSDB_CODE_SUCCESS; } -static int32_t sdbDeleteLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { +static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { pthread_mutex_lock(&pTable->mutex); (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj); pTable->numOfRows--; pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, delete record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), + sdbTrace("table:%s, delete record:%s from hash, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows); (*pTable->deleteFp)(pOper); @@ -253,127 +428,76 @@ static int32_t sdbDeleteLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { - sdbTrace("table:%s, update record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), +static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { + sdbTrace("table:%s, update record:%s in hash, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows); (*pTable->updateFp)(pOper); return TSDB_CODE_SUCCESS; } -static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) { - int32_t code = 0; +static int sdbWrite(void *param, void *data, int type) { + SWalHead *pHead = data; + int32_t tableId = pHead->msgType / 10; + int32_t action = pHead->msgType % 10; - pthread_mutex_lock(&tsSdbObj->mutex); - tsSdbObj->version++; - pHead->version = tsSdbObj->version; + SSdbTable *pTable = sdbGetTableFromId(tableId); + assert(pTable != NULL); - code = replicaForwardReqToPeer(pHead); - if (code != TSDB_CODE_SUCCESS) { - pthread_mutex_unlock(&tsSdbObj->mutex); - sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, - sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code)); - return code; + pthread_mutex_lock(&tsSdbObj.mutex); + if (pHead->version == 0) { + // assign version + tsSdbObj.version++; + pHead->version = tsSdbObj.version; + } else { + // for data from WAL or forward, version may be smaller + if (pHead->version <= tsSdbObj.version) { + pthread_mutex_unlock(&tsSdbObj.mutex); + return TSDB_CODE_SUCCESS; + } else if (pHead->version != tsSdbObj.version + 1) { + pthread_mutex_unlock(&tsSdbObj.mutex); + sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, + tsSdbObj.version); + return TSDB_CODE_OTHERS; + } else { + tsSdbObj.version = pHead->version; + } } - code = walWrite(tsSdbObj->wal, pHead); - pthread_mutex_unlock(&tsSdbObj->mutex); - + int32_t code = walWrite(tsSdbObj.wal, pHead); if (code < 0) { - sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName, - sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code)); - } else { - sdbTrace("table:%s, success to %s record:%s to file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), - sdbGetkeyStr(pTable, pHead->cont), pHead->version); + pthread_mutex_unlock(&tsSdbObj.mutex); + return code; } + walFsync(tsSdbObj.wal); - walFsync(tsSdbObj->wal); - taosFreeQitem(pHead); - return code; -} - -static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) { - pthread_mutex_lock(&tsSdbObj->mutex); - if (pHead->version <= tsSdbObj->version) { - pthread_mutex_unlock(&tsSdbObj->mutex); - return TSDB_CODE_SUCCESS; - } else if (pHead->version != tsSdbObj->version + 1) { - pthread_mutex_unlock(&tsSdbObj->mutex); - sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64, - pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, - tsSdbObj->version); - return TSDB_CODE_OTHERS; - } + sdbForwardToPeer(pHead); + pthread_mutex_unlock(&tsSdbObj.mutex); - tsSdbObj->version = pHead->version; - sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); + // from app, oper is created + if (param == NULL) return code; - int32_t code = -1; + // from wal, should create oper if (action == SDB_ACTION_INSERT) { - SSdbOperDesc oper = { - .rowSize = pHead->len, - .rowData = pHead->cont, - .table = pTable, - }; + SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; code = (*pTable->decodeFp)(&oper); - if (code < 0) { - sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbObj->mutex); - return code; - } - - code = sdbInsertLocal(pTable, &oper); + return sdbInsertHash(pTable, &oper); } else if (action == SDB_ACTION_DELETE) { - SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont); + SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); assert(rowMeta != NULL && rowMeta->row != NULL); - - SSdbOperDesc oper = { - .table = pTable, - .pObj = rowMeta->row, - }; - - code = sdbDeleteLocal(pTable, &oper); + SSdbOper oper = {.table = pTable, .pObj = rowMeta->row}; + return sdbDeleteHash(pTable, &oper); } else if (action == SDB_ACTION_UPDATE) { - SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont); + SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); assert(rowMeta != NULL && rowMeta->row != NULL); - - SSdbOperDesc oper = { - .rowSize = pHead->len, - .rowData = pHead->cont, - .table = pTable, - }; + SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; code = (*pTable->decodeFp)(&oper); - if (code < 0) { - sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbObj->mutex); - return code; - } - code = sdbUpdateLocal(pTable, &oper); - } - - pthread_mutex_unlock(&tsSdbObj->mutex); - return code; + return sdbUpdateHash(pTable, &oper); + } else { return TSDB_CODE_INVALID_MSG_TYPE; } } -int sdbProcessWrite(void *param, void *data, int type) { - SWalHead *pHead = data; - int32_t tableId = pHead->msgType / 10; - int32_t action = pHead->msgType % 10; - - SSdbTable *pTable = sdbGetTableFromId(tableId); - assert(pTable != NULL); - - if (pHead->version == 0) { - return sdbProcessWriteFromApp(pTable, pHead, action); - } else { - return sdbProcessWriteFromWal(pTable, pHead, action); - } -} - -int32_t sdbInsertRow(SSdbOperDesc *pOper) { +int32_t sdbInsertRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; @@ -405,19 +529,19 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); + int32_t code = sdbWrite(NULL, pHead, pHead->msgType); + taosFreeQitem(pHead); if (code < 0) return code; - } - - return sdbInsertLocal(pTable, pOper); + } + + return sdbInsertHash(pTable, pOper); } -// row here can be object or null-terminated string -int32_t sdbDeleteRow(SSdbOperDesc *pOper) { +int32_t sdbDeleteRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj); + SSdbRow *pMeta = sdbGetRowMeta(pTable, pOper->pObj); if (pMeta == NULL) { sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); return -1; @@ -447,18 +571,19 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; memcpy(pHead->cont, pOper->pObj, rowSize); - int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); + int32_t code = sdbWrite(NULL, pHead, pHead->msgType); + taosFreeQitem(pHead); if (code < 0) return code; - } - - return sdbDeleteLocal(pTable, pOper); + } + + return sdbDeleteHash(pTable, pOper); } -int32_t sdbUpdateRow(SSdbOperDesc *pOper) { +int32_t sdbUpdateRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj); + SSdbRow *pMeta = sdbGetRowMeta(pTable, pOper->pObj); if (pMeta == NULL) { sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); return -1; @@ -477,16 +602,17 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); + int32_t code = sdbWrite(NULL, pHead, pHead->msgType); + taosFreeQitem(pHead); if (code < 0) return code; - } + } - return sdbUpdateLocal(pTable, pOper); + return sdbUpdateHash(pTable, pOper); } void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta; + SSdbRow * pMeta; *ppRow = NULL; if (pTable == NULL) return NULL; @@ -520,13 +646,13 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->restoredFp = pDesc->restoredFp; if (sdbInitIndexFp[pTable->keyType] != NULL) { - pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); + pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SSdbRow)); } pthread_mutex_init(&pTable->mutex, NULL); - tsSdbNumOfTables++; - tsSdbTableList[pTable->tableId] = pTable; + tsSdbObj.numOfTables++; + tsSdbObj.tableList[pTable->tableId] = pTable; return pTable; } @@ -534,16 +660,16 @@ void sdbCloseTable(void *handle) { SSdbTable *pTable = (SSdbTable *)handle; if (pTable == NULL) return; - tsSdbNumOfTables--; - tsSdbTableList[pTable->tableId] = NULL; + tsSdbObj.numOfTables--; + tsSdbObj.tableList[pTable->tableId] = NULL; void *pNode = NULL; while (1) { - SRowMeta *pMeta; + SSdbRow *pMeta; pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta); if (pMeta == NULL) break; - SSdbOperDesc oper = { + SSdbOper oper = { .pObj = pMeta->row, .table = pTable, }; @@ -557,6 +683,7 @@ void sdbCloseTable(void *handle) { pthread_mutex_destroy(&pTable->mutex); - sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbNumOfTables); + sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables); free(pTable); } + diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 1dec4bd015..04f771081b 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -144,7 +144,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (!mgmtIsMaster()) { + if (!sdbIsMaster()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 3fb4272b7f..383b347410 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -83,12 +83,12 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) { tfree(pTable); } -static int32_t mgmtChildTableActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionDestroy(SSdbOper *pOper) { mgmtDestroyChildTable(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) { SChildTableObj *pTable = pOper->pObj; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); @@ -128,7 +128,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionDelete(SSdbOper *pOper) { SChildTableObj *pTable = pOper->pObj; if (pTable->vgId == 0) { return TSDB_CODE_INVALID_VGROUP_ID; @@ -169,7 +169,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) { SChildTableObj *pNew = pOper->pObj; SChildTableObj *pTable = mgmtGetChildTable(pNew->info.tableId); if (pTable != pNew) { @@ -186,7 +186,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionEncode(SSdbOper *pOper) { const int32_t maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS; SChildTableObj *pTable = pOper->pObj; assert(pTable != NULL && pOper->rowData != NULL); @@ -208,7 +208,7 @@ static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtChildTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); if (pTable == NULL) { @@ -252,7 +252,7 @@ static int32_t mgmtChildTableActionRestored() { SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); if (pDb == NULL) { mError("ctable:%s, failed to get db, discard it", pTable->info.tableId); - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -266,7 +266,7 @@ static int32_t mgmtChildTableActionRestored() { if (pVgroup == NULL) { mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -280,7 +280,7 @@ static int32_t mgmtChildTableActionRestored() { mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -292,7 +292,7 @@ static int32_t mgmtChildTableActionRestored() { if (pVgroup->tableList == NULL) { mError("ctable:%s, vgroup:%d tableList is null", pTable->info.tableId, pTable->vgId); pTable->vgId = 0; - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -306,7 +306,7 @@ static int32_t mgmtChildTableActionRestored() { if (pSuperTable == NULL) { mError("ctable:%s, stable:%s not exist", pTable->info.tableId, pTable->superTableId); pTable->vgId = 0; - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -347,7 +347,7 @@ static int32_t mgmtInitChildTables() { return -1; } - mTrace("child table is initialized"); + mTrace("table:ctables is created"); return 0; } @@ -392,12 +392,12 @@ static void mgmtDestroySuperTable(SSuperTableObj *pStable) { tfree(pStable); } -static int32_t mgmtSuperTableActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionDestroy(SSdbOper *pOper) { mgmtDestroySuperTable(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionInsert(SSdbOper *pOper) { SSuperTableObj *pStable = pOper->pObj; SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); if (pDb != NULL) { @@ -408,7 +408,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionDelete(SSdbOper *pOper) { SSuperTableObj *pStable = pOper->pObj; SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); if (pDb != NULL) { @@ -420,7 +420,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { SSuperTableObj *pNew = pOper->pObj; SSuperTableObj *pTable = mgmtGetSuperTable(pNew->info.tableId); if (pTable != pNew) { @@ -435,7 +435,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionEncode(SSdbOper *pOper) { const int32_t maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS; SSuperTableObj *pStable = pOper->pObj; @@ -454,7 +454,7 @@ static int32_t mgmtSuperTableActionEncode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtSuperTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); SSuperTableObj *pStable = (SSuperTableObj *) calloc(1, sizeof(SSuperTableObj)); @@ -505,7 +505,7 @@ static int32_t mgmtInitSuperTables() { return -1; } - mTrace("stables is initialized"); + mTrace("table:stables is created"); return 0; } @@ -731,7 +731,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { tschema[col].bytes = htons(tschema[col].bytes); } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -755,7 +755,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); } else { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable @@ -806,7 +806,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i pStable->numOfColumns += ntags; pStable->sversion++; - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -837,7 +837,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -872,7 +872,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag SSchema *schema = (SSchema *) (pStable->schema + (pStable->numOfColumns + col) * sizeof(SSchema)); strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -931,7 +931,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc mgmtDecAcctRef(pAcct); } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -968,7 +968,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch mgmtDecAcctRef(pAcct); } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, @@ -1116,7 +1116,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsSuperTableSdb, .pObj = pTable, @@ -1354,7 +1354,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj } } - SSdbOperDesc desc = {0}; + SSdbOper desc = {0}; desc.type = SDB_OPER_GLOBAL; desc.pObj = pTable; desc.table = tsChildTableSdb; @@ -1508,7 +1508,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc mgmtDecAcctRef(pAcct); } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable, @@ -1542,7 +1542,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch mgmtDecAcctRef(pAcct); } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable, @@ -1687,7 +1687,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsChildTableSdb, .pObj = pTable, @@ -1716,7 +1716,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { } if (pTable->superTable == pStable) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsChildTableSdb, .pObj = pTable, @@ -1805,7 +1805,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { return; } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable @@ -1848,7 +1848,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 9098a0c17d..dc4c3d6bea 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -36,12 +36,12 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); -static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionInsert(SSdbOper *pOper) { SUserObj *pUser = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pUser->acct); @@ -56,7 +56,7 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionDelete(SSdbOper *pOper) { SUserObj *pUser = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pUser->acct); @@ -67,7 +67,7 @@ static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionUpdate(SSdbOper *pOper) { SUserObj *pUser = pOper->pObj; SUserObj *pSaved = mgmtGetUser(pUser->user); if (pUser != pSaved) { @@ -77,14 +77,14 @@ static int32_t mgmtUserActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionEncode(SSdbOper *pOper) { SUserObj *pUser = pOper->pObj; memcpy(pOper->rowData, pUser, tsUserUpdateSize); pOper->rowSize = tsUserUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtUserActionDecode(SSdbOper *pOper) { SUserObj *pUser = (SUserObj *) calloc(1, sizeof(SUserObj)); if (pUser == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -137,7 +137,7 @@ int32_t mgmtInitUsers() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); - mTrace("user data is initialized"); + mTrace("table:users table is created"); return 0; } @@ -154,7 +154,7 @@ void mgmtReleaseUser(SUserObj *pUser) { } static int32_t mgmtUpdateUser(SUserObj *pUser) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsUserSdb, .pObj = pUser, @@ -202,7 +202,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { pUser->superAuth = 1; } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsUserSdb, .pObj = pUser, @@ -219,7 +219,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { } static int32_t mgmtDropUser(SUserObj *pUser) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsUserSdb, .pObj = pUser @@ -493,7 +493,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) { if (pUser == NULL) break; if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsUserSdb, .pObj = pUser, diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 9be95f4087..87d8ddb2b4 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -22,7 +22,7 @@ #include "tidpool.h" #include "tsync.h" #include "ttime.h" -#include "treplica.h" +#include "tbalance.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtDb.h" @@ -48,7 +48,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ; static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); -static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; if (pVgroup->idPool) { taosIdPoolCleanUp(pVgroup->idPool); @@ -62,7 +62,7 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { @@ -104,7 +104,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; if (pVgroup->pDb != NULL) { @@ -124,7 +124,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { SVgObj *pNew = pOper->pObj; SVgObj *pVgroup = mgmtGetVgroup(pNew->vgId); if (pVgroup != pNew) { @@ -147,14 +147,14 @@ static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionEncode(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionEncode(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; memcpy(pOper->rowData, pVgroup, tsVgUpdateSize); pOper->rowSize = tsVgUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { +static int32_t mgmtVgroupActionDecode(SSdbOper *pOper) { SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj)); if (pVgroup == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -199,7 +199,7 @@ int32_t mgmtInitVgroups() { mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); - mTrace("vgroup is initialized"); + mTrace("table:vgroups is created"); return 0; } @@ -213,7 +213,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { } void mgmtUpdateVgroup(SVgObj *pVgroup) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup, @@ -249,7 +249,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->createdTime = taosGetTimestampMs(); - if (replicaAllocVnodes(pVgroup) != 0) { + if (balanceAllocVnodes(pVgroup) != 0) { mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes); free(pVgroup); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); @@ -257,7 +257,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { return; } - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup, @@ -289,7 +289,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { } else { mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mgmtSendDropVgroupMsg(pVgroup, NULL); - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup @@ -596,7 +596,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); mgmtAddToShellQueue(newMsg); } else { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup @@ -659,7 +659,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { if (queueMsg->received != queueMsg->expected) return; - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup @@ -716,7 +716,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { if (pVgroup == NULL) break; if (strncmp(pDropDb->name, pVgroup->dbName, dbNameLen) == 0) { - SSdbOperDesc oper = { + SSdbOper oper = { .type = SDB_OPER_LOCAL, .table = tsVgroupSdb, .pObj = pVgroup, diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0827d90ebc..6c33203cf9 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -45,6 +45,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; } +void syncStop(tsync_h shandle) {} +int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } +int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } #endif static void vnodeInit() { -- GitLab