From f323186f78eba56a531049bae8a69888fe6111b0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 Nov 2020 13:48:05 +0800 Subject: [PATCH] TD-2046 --- src/mnode/inc/mnodeSdb.h | 46 +++--- src/mnode/src/mnodeAcct.c | 41 +++-- src/mnode/src/mnodeCluster.c | 30 ++-- src/mnode/src/mnodeDb.c | 66 ++++---- src/mnode/src/mnodeDnode.c | 50 +++--- src/mnode/src/mnodeMnode.c | 48 +++--- src/mnode/src/mnodeSdb.c | 286 +++++++++++++++++------------------ src/mnode/src/mnodeTable.c | 216 +++++++++++++------------- src/mnode/src/mnodeUser.c | 54 +++---- src/mnode/src/mnodeVgroup.c | 86 +++++------ 10 files changed, 460 insertions(+), 463 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 965baf7c0d..29d8cf1207 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -23,8 +23,6 @@ extern "C" { #include "mnode.h" #include "twal.h" -struct SSdbTable; - typedef enum { SDB_TABLE_CLUSTER = 0, SDB_TABLE_DNODE = 1, @@ -50,20 +48,20 @@ typedef enum { SDB_OPER_LOCAL = 1 } ESdbOper; -typedef struct SSWriteMsg { - ESdbOper type; - int32_t processedCount; // for sync fwd callback - int32_t code; // for callback in sdb queue - int32_t rowSize; - void * rowData; +typedef struct SSdbRow { + ESdbOper type; + int32_t processedCount; // for sync fwd callback + int32_t code; // for callback in sdb queue + int32_t rowSize; + void * rowData; + void * pObj; + void * pTable; + SMnodeMsg *pMsg; int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); - void * pRow; - SMnodeMsg *pMsg; - struct SSdbTable *pTable; - char reserveForSync[16]; - SWalHead pHead[]; -} SSWriteMsg; + char reserveForSync[16]; + SWalHead pHead[]; +} SSdbRow; typedef struct { char * name; @@ -72,12 +70,12 @@ typedef struct { int32_t refCountPos; ESdbTable id; ESdbKey keyType; - int32_t (*fpInsert)(SSWriteMsg *pWrite); - int32_t (*fpDelete)(SSWriteMsg *pWrite); - int32_t (*fpUpdate)(SSWriteMsg *pWrite); - int32_t (*fpEncode)(SSWriteMsg *pWrite); - int32_t (*fpDecode)(SSWriteMsg *pWrite); - int32_t (*fpDestroy)(SSWriteMsg *pWrite); + int32_t (*fpInsert)(SSdbRow *pRow); + int32_t (*fpDelete)(SSdbRow *pRow); + int32_t (*fpUpdate)(SSdbRow *pRow); + int32_t (*fpEncode)(SSdbRow *pRow); + int32_t (*fpDecode)(SSdbRow *pRow); + int32_t (*fpDestroy)(SSdbRow *pRow); int32_t (*fpRestored)(); } SSdbTableDesc; @@ -89,10 +87,10 @@ bool sdbIsMaster(); bool sdbIsServing(); void sdbUpdateMnodeRoles(); -int32_t sdbInsertRow(SSWriteMsg *pWrite); -int32_t sdbDeleteRow(SSWriteMsg *pWrite); -int32_t sdbUpdateRow(SSWriteMsg *pWrite); -int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite); +int32_t sdbInsertRow(SSdbRow *pRow); +int32_t sdbDeleteRow(SSdbRow *pRow); +int32_t sdbUpdateRow(SSdbRow *pRow); +int32_t sdbInsertRowToQueue(SSdbRow *pRow); void *sdbGetRow(void *pTable, void *key); void *sdbFetchRow(void *pTable, void *pIter, void **ppRow); diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index 48af408c99..9fff2f0229 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "dnode.h" #include "mnodeDef.h" #include "mnodeInt.h" @@ -25,36 +26,34 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -#include "tglobal.h" - void * tsAcctSdb = NULL; static int32_t tsAcctUpdateSize; static int32_t mnodeCreateRootAcct(); -static int32_t mnodeAcctActionDestroy(SSWriteMsg *pWMsg) { - SAcctObj *pAcct = pWMsg->pRow; +static int32_t mnodeAcctActionDestroy(SSdbRow *pRow) { + SAcctObj *pAcct = pRow->pObj; pthread_mutex_destroy(&pAcct->mutex); - tfree(pWMsg->pRow); + tfree(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionInsert(SSWriteMsg *pWMsg) { - SAcctObj *pAcct = pWMsg->pRow; +static int32_t mnodeAcctActionInsert(SSdbRow *pRow) { + SAcctObj *pAcct = pRow->pObj; memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo)); pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS; pthread_mutex_init(&pAcct->mutex, NULL); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionDelete(SSWriteMsg *pWMsg) { - SAcctObj *pAcct = pWMsg->pRow; +static int32_t mnodeAcctActionDelete(SSdbRow *pRow) { + SAcctObj *pAcct = pRow->pObj; mnodeDropAllUsers(pAcct); mnodeDropAllDbs(pAcct); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) { - SAcctObj *pAcct = pWMsg->pRow; +static int32_t mnodeAcctActionUpdate(SSdbRow *pRow) { + SAcctObj *pAcct = pRow->pObj; SAcctObj *pSaved = mnodeGetAcct(pAcct->user); if (pAcct != pSaved) { memcpy(pSaved, pAcct, tsAcctUpdateSize); @@ -64,19 +63,19 @@ static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionEncode(SSWriteMsg *pWMsg) { - SAcctObj *pAcct = pWMsg->pRow; - memcpy(pWMsg->rowData, pAcct, tsAcctUpdateSize); - pWMsg->rowSize = tsAcctUpdateSize; +static int32_t mnodeAcctActionEncode(SSdbRow *pRow) { + SAcctObj *pAcct = pRow->pObj; + memcpy(pRow->rowData, pAcct, tsAcctUpdateSize); + pRow->rowSize = tsAcctUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeAcctActionDecode(SSdbRow *pRow) { SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj)); if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pAcct, pWMsg->rowData, tsAcctUpdateSize); - pWMsg->pRow = pAcct; + memcpy(pAcct, pRow->rowData, tsAcctUpdateSize); + pRow->pObj = pAcct; return TSDB_CODE_SUCCESS; } @@ -226,13 +225,13 @@ static int32_t mnodeCreateRootAcct() { pAcct->acctId = sdbGetId(tsAcctSdb); pAcct->createdTime = taosGetTimestampMs(); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsAcctSdb, - .pRow = pAcct, + .pObj = pAcct, }; - return sdbInsertRow(&wmsg); + return sdbInsertRow(&row); } #ifndef _ACCT diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index ff84b7ac3f..5be67e4ad9 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster(); static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeClusterActionDestroy(SSWriteMsg *pWMsg) { - tfree(pWMsg->pRow); +static int32_t mnodeClusterActionDestroy(SSdbRow *pRow) { + tfree(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionInsert(SSWriteMsg *pWMsg) { +static int32_t mnodeClusterActionInsert(SSdbRow *pRow) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionDelete(SSWriteMsg *pWMsg) { +static int32_t mnodeClusterActionDelete(SSdbRow *pRow) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionUpdate(SSWriteMsg *pWMsg) { +static int32_t mnodeClusterActionUpdate(SSdbRow *pRow) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionEncode(SSWriteMsg *pWMsg) { - SClusterObj *pCluster = pWMsg->pRow; - memcpy(pWMsg->rowData, pCluster, tsClusterUpdateSize); - pWMsg->rowSize = tsClusterUpdateSize; +static int32_t mnodeClusterActionEncode(SSdbRow *pRow) { + SClusterObj *pCluster = pRow->pObj; + memcpy(pRow->rowData, pCluster, tsClusterUpdateSize); + pRow->rowSize = tsClusterUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeClusterActionDecode(SSdbRow *pRow) { SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj)); if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pCluster, pWMsg->rowData, tsClusterUpdateSize); - pWMsg->pRow = pCluster; + memcpy(pCluster, pRow->rowData, tsClusterUpdateSize); + pRow->pObj = pCluster; return TSDB_CODE_SUCCESS; } @@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() { mDebug("uid is %s", pCluster->uid); } - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsClusterSdb, - .pRow = pCluster, + .pObj = pCluster, }; - return sdbInsertRow(&wmsg); + return sdbInsertRow(&row); } const char* mnodeGetClusterId() { diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 96658ac73c..d121208447 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) { tfree(pDb); } -static int32_t mnodeDbActionDestroy(SSWriteMsg *pWMsg) { - mnodeDestroyDb(pWMsg->pRow); +static int32_t mnodeDbActionDestroy(SSdbRow *pRow) { + mnodeDestroyDb(pRow->pObj); return TSDB_CODE_SUCCESS; } @@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() { return sdbGetNumOfRows(tsDbSdb); } -static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) { - SDbObj *pDb = pWMsg->pRow; +static int32_t mnodeDbActionInsert(SSdbRow *pRow) { + SDbObj *pDb = pRow->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); pthread_mutex_init(&pDb->mutex, NULL); @@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) { - SDbObj *pDb = pWMsg->pRow; +static int32_t mnodeDbActionDelete(SSdbRow *pRow) { + SDbObj *pDb = pRow->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); mnodeDropAllChildTables(pDb); @@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) { - SDbObj *pNew = pWMsg->pRow; +static int32_t mnodeDbActionUpdate(SSdbRow *pRow) { + SDbObj *pNew = pRow->pObj; SDbObj *pDb = mnodeGetDb(pNew->name); if (pDb != NULL && pNew != pDb) { - memcpy(pDb, pNew, pWMsg->rowSize); + memcpy(pDb, pNew, pRow->rowSize); free(pNew->vgList); free(pNew); } @@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionEncode(SSWriteMsg *pWMsg) { - SDbObj *pDb = pWMsg->pRow; - memcpy(pWMsg->rowData, pDb, tsDbUpdateSize); - pWMsg->rowSize = tsDbUpdateSize; +static int32_t mnodeDbActionEncode(SSdbRow *pRow) { + SDbObj *pDb = pRow->pObj; + memcpy(pRow->rowData, pDb, tsDbUpdateSize); + pRow->rowSize = tsDbUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeDbActionDecode(SSdbRow *pRow) { SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pDb, pWMsg->rowData, tsDbUpdateSize); - pWMsg->pRow = pDb; + memcpy(pDb, pRow->rowData, tsDbUpdateSize); + pRow->pObj = pDb; return TSDB_CODE_SUCCESS; } @@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * pMsg->pDb = pDb; mnodeIncDbRef(pDb); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, - .pRow = pDb, + .pObj = pDb, .rowSize = sizeof(SDbObj), .pMsg = pMsg, .fpRsp = mnodeCreateDbCb }; - code = sdbInsertRow(&wmsg); + code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); pMsg->pDb = NULL; @@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { } #if 0 -void mnodePrintVgroups(SDbObj *pDb, char *wmsg) { - mInfo("db:%s, vgroup link from head, wmsg:%s", pDb->name, wmsg); +void mnodePrintVgroups(SDbObj *pDb, char *row) { + mInfo("db:%s, vgroup link from head, row:%s", pDb->name, row); SVgObj *pVgroup = pDb->pHead; while (pVgroup != NULL) { mInfo("vgId:%d", pVgroup->vgId); @@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { if (pDb->status) return TSDB_CODE_SUCCESS; pDb->status = true; - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, - .pRow = pDb + .pObj = pDb }; - int32_t code = sdbUpdateRow(&wmsg); + int32_t code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code)); } @@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; pDb->cfgVersion++; - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, - .pRow = pDb, + .pObj = pDb, .pMsg = pMsg, .fpRsp = mnodeAlterDbCb }; - code = sdbUpdateRow(&wmsg); + code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); } @@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { SDbObj *pDb = pMsg->pDb; mInfo("db:%s, drop db from sdb", pDb->name); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, - .pRow = pDb, + .pObj = pDb, .pMsg = pMsg, .fpRsp = mnodeDropDbCb }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code)); } @@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { if (pDb->pAcct == pAcct) { mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsDbSdb, - .pRow = pDb + .pObj = pDb }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfDbs++; } mnodeDecDbRef(pDb); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 7b52747951..f76533c760 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -87,13 +87,13 @@ static char* offlineReason[] = { "unknown", }; -static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pWMsg) { - tfree(pWMsg->pRow); +static int32_t mnodeDnodeActionDestroy(SSdbRow *pRow) { + tfree(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) { - SDnodeObj *pDnode = pWMsg->pRow; +static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) { + SDnodeObj *pDnode = pRow->pObj; if (pDnode->status != TAOS_DN_STATUS_DROPPING) { pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->lastAccess = tsAccessSquence; @@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) { - SDnodeObj *pDnode = pWMsg->pRow; +static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) { + SDnodeObj *pDnode = pRow->pObj; #ifndef _SYNC mnodeDropAllDnodeVgroups(pDnode); @@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) { - SDnodeObj *pNew = pWMsg->pRow; +static int32_t mnodeDnodeActionUpdate(SSdbRow *pRow) { + SDnodeObj *pNew = pRow->pObj; SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); if (pDnode != NULL && pNew != pDnode) { - memcpy(pDnode, pNew, pWMsg->rowSize); + memcpy(pDnode, pNew, pRow->rowSize); free(pNew); } mnodeDecDnodeRef(pDnode); @@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionEncode(SSWriteMsg *pWMsg) { - SDnodeObj *pDnode = pWMsg->pRow; - memcpy(pWMsg->rowData, pDnode, tsDnodeUpdateSize); - pWMsg->rowSize = tsDnodeUpdateSize; +static int32_t mnodeDnodeActionEncode(SSdbRow *pRow) { + SDnodeObj *pDnode = pRow->pObj; + memcpy(pRow->rowData, pDnode, tsDnodeUpdateSize); + pRow->rowSize = tsDnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeDnodeActionDecode(SSdbRow *pRow) { SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pDnode, pWMsg->rowData, tsDnodeUpdateSize); - pWMsg->pRow = pDnode; + memcpy(pDnode, pRow->rowData, tsDnodeUpdateSize); + pRow->pObj = pDnode; return TSDB_CODE_SUCCESS; } @@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) { } void mnodeUpdateDnode(SDnodeObj *pDnode) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDnodeSdb, - .pRow = pDnode + .pObj = pDnode }; - int32_t code = sdbUpdateRow(&wmsg); + int32_t code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnodeId:%d, failed update", pDnode->dnodeId); } @@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN); taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDnodeSdb, - .pRow = pDnode, + .pObj = pDnode, .rowSize = sizeof(SDnodeObj), .pMsg = pMsg }; - int32_t code = sdbInsertRow(&wmsg); + int32_t code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { int dnodeId = pDnode->dnodeId; tfree(pDnode); @@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { } int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsDnodeSdb, - .pRow = pDnode, + .pObj = pDnode, .pMsg = pMsg }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); } else { diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index a97aef61e9..205bfda4b9 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo #define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock) #endif -static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pWMsg) { - tfree(pWMsg->pRow); +static int32_t mnodeMnodeActionDestroy(SSdbRow *pRow) { + tfree(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) { - SMnodeObj *pMnode = pWMsg->pRow; +static int32_t mnodeMnodeActionInsert(SSdbRow *pRow) { + SMnodeObj *pMnode = pRow->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) { - SMnodeObj *pMnode = pWMsg->pRow; +static int32_t mnodeMnodeActionDelete(SSdbRow *pRow) { + SMnodeObj *pMnode = pRow->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pWMsg) { - SMnodeObj *pMnode = pWMsg->pRow; +static int32_t mnodeMnodeActionUpdate(SSdbRow *pRow) { + SMnodeObj *pMnode = pRow->pObj; SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId); if (pMnode != pSaved) { - memcpy(pSaved, pMnode, pWMsg->rowSize); + memcpy(pSaved, pMnode, pRow->rowSize); free(pMnode); } mnodeDecMnodeRef(pSaved); return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionEncode(SSWriteMsg *pWMsg) { - SMnodeObj *pMnode = pWMsg->pRow; - memcpy(pWMsg->rowData, pMnode, tsMnodeUpdateSize); - pWMsg->rowSize = tsMnodeUpdateSize; +static int32_t mnodeMnodeActionEncode(SSdbRow *pRow) { + SMnodeObj *pMnode = pRow->pObj; + memcpy(pRow->rowData, pMnode, tsMnodeUpdateSize); + pRow->rowSize = tsMnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeMnodeActionDecode(SSdbRow *pRow) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pMnode, pWMsg->rowData, tsMnodeUpdateSize); - pWMsg->pRow = pMnode; + memcpy(pMnode, pRow->rowData, tsMnodeUpdateSize); + pRow->pObj = pMnode; return TSDB_CODE_SUCCESS; } @@ -325,10 +325,10 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsMnodeSdb, - .pRow = pMnode, + .pObj = pMnode, .fpRsp = mnodeCreateMnodeCb }; @@ -342,7 +342,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { return; } - code = sdbInsertRow(&wmsg); + code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code)); tfree(pMnode); @@ -352,8 +352,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { void mnodeDropMnodeLocal(int32_t dnodeId) { SMnodeObj *pMnode = mnodeGetMnode(dnodeId); if (pMnode != NULL) { - SSWriteMsg wmsg = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pRow = pMnode}; - sdbDeleteRow(&wmsg); + SSdbRow row = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pObj = pMnode}; + sdbDeleteRow(&row); mnodeDecMnodeRef(pMnode); } @@ -367,13 +367,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) { return TSDB_CODE_MND_DNODE_NOT_EXIST; } - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsMnodeSdb, - .pRow = pMnode + .pObj = pMnode }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); sdbDecRef(tsMnodeSdb, pMnode); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 15e638d436..a79eec16aa 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -65,12 +65,12 @@ typedef struct SSdbTable { int32_t autoIndex; int64_t numOfRows; void * iHandle; - int32_t (*fpInsert)(SSWriteMsg *pWrite); - int32_t (*fpDelete)(SSWriteMsg *pWrite); - int32_t (*fpUpdate)(SSWriteMsg *pWrite); - int32_t (*fpDecode)(SSWriteMsg *pWrite); - int32_t (*fpEncode)(SSWriteMsg *pWrite); - int32_t (*fpDestroy)(SSWriteMsg *pWrite); + int32_t (*fpInsert)(SSdbRow *pRow); + int32_t (*fpDelete)(SSdbRow *pRow); + int32_t (*fpUpdate)(SSdbRow *pRow); + int32_t (*fpDecode)(SSdbRow *pRow); + int32_t (*fpEncode)(SSdbRow *pRow); + int32_t (*fpDestroy)(SSdbRow *pRow); int32_t (*fpRestored)(); pthread_mutex_t mutex; } SSdbTable; @@ -106,17 +106,17 @@ static taos_qall tsSdbWQall; static taos_queue tsSdbWQueue; static SSdbWorkerPool tsSdbPool; -static int32_t sdbProcessWrite(void *pWrite, void *pHead, int32_t qtype, void *unused); +static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); -static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action); +static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static void * sdbWorkerFp(void *pWorker); static int32_t sdbInitWorker(); static void sdbCleanupWorker(); static int32_t sdbAllocQueue(); static void sdbFreeQueue(); -static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite); -static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite); -static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite); +static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow); +static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow); +static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow); int32_t sdbGetId(void *pTable) { return ((SSdbTable *)pTable)->autoIndex; @@ -248,28 +248,28 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { } // failed to forward, need revert insert -static void sdbHandleFailedConfirm(SSWriteMsg *pWrite) { - SWalHead *pHead = pWrite->pHead; +static void sdbHandleFailedConfirm(SSdbRow *pRow) { + SWalHead *pHead = pRow->pHead; int32_t action = pHead->msgType % 10; - sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow, - sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pWrite->code)); + sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pRow->pObj, + sdbGetKeyStr(pRow->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pRow->code)); // It's better to create a table in two stages, create it first and then set it success if (action == SDB_ACTION_INSERT) { - SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = pWrite->pTable, .pRow = pWrite->pRow}; - sdbDeleteRow(&wmsg); + SSdbRow row = {.type = SDB_OPER_GLOBAL, .pTable = pRow->pTable, .pObj = pRow->pObj}; + sdbDeleteRow(&row); } } FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { if (wparam == NULL) return; - SSWriteMsg *pWrite = wparam; - SMnodeMsg * pMsg = pWrite->pMsg; + SSdbRow *pRow = wparam; + SMnodeMsg * pMsg = pRow->pMsg; - if (code <= 0) pWrite->code = code; - int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); + if (code <= 0) pRow->code = code; + int32_t count = atomic_add_fetch_32(&pRow->processedCount, 1); if (count <= 1) { if (pMsg != NULL) sdbTrace("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, count, code); return; @@ -277,13 +277,13 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { if (pMsg != NULL) sdbTrace("vgId:1, msg:%p is confirmed, code:%x", pMsg, code); } - if (pWrite->code != TSDB_CODE_SUCCESS) sdbHandleFailedConfirm(pWrite); + if (pRow->code != TSDB_CODE_SUCCESS) sdbHandleFailedConfirm(pRow); - if (pWrite->fpRsp != NULL) { - pWrite->code = (*pWrite->fpRsp)(pMsg, pWrite->code); + if (pRow->fpRsp != NULL) { + pRow->code = (*pRow->fpRsp)(pMsg, pRow->code); } - dnodeSendRpcMWriteRsp(pMsg, pWrite->code); + dnodeSendRpcMWriteRsp(pMsg, pRow->code); } static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); } @@ -447,8 +447,8 @@ void sdbDecRef(void *tparam, void *pRow) { int32_t *updateEnd = pRow + pTable->refCountPos - 4; if (refCount <= 0 && *updateEnd) { sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); - SSWriteMsg wmsg = {.pRow = pRow}; - (*pTable->fpDestroy)(&wmsg); + SSdbRow row = {.pObj = pRow}; + (*pTable->fpDestroy)(&row); } } @@ -485,8 +485,8 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { return sdbGetRow(pTable, sdbGetObjKey(pTable, key)); } -static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) { - void * key = sdbGetObjKey(pTable, pWrite->pRow); +static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { + void * key = sdbGetObjKey(pTable, pRow->pObj); int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { @@ -494,43 +494,43 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) { } pthread_mutex_lock(&pTable->mutex); - taosHashPut(pTable->iHandle, key, keySize, &pWrite->pRow, sizeof(int64_t)); + taosHashPut(pTable->iHandle, key, keySize, &pRow->pObj, sizeof(int64_t)); pthread_mutex_unlock(&pTable->mutex); - sdbIncRef(pTable, pWrite->pRow); + sdbIncRef(pTable, pRow->pObj); atomic_add_fetch_32(&pTable->numOfRows, 1); if (pTable->keyType == SDB_KEY_AUTO) { - pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pWrite->pRow)); + pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pRow->pObj)); } else { atomic_add_fetch_32(&pTable->autoIndex, 1); } sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg); - int32_t code = (*pTable->fpInsert)(pWrite); + int32_t code = (*pTable->fpInsert)(pRow); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow)); - sdbDeleteHash(pTable, pWrite); + sdbGetRowStr(pTable, pRow->pObj)); + sdbDeleteHash(pTable, pRow); } return TSDB_CODE_SUCCESS; } -static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { - int32_t *updateEnd = pWrite->pRow + pTable->refCountPos - 4; +static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { + int32_t *updateEnd = pRow->pObj + pTable->refCountPos - 4; bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; if (!set) { sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow)); + sdbGetRowStr(pTable, pRow->pObj)); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - (*pTable->fpDelete)(pWrite); + (*pTable->fpDelete)(pRow); - void * key = sdbGetObjKey(pTable, pWrite->pRow); + void * key = sdbGetObjKey(pTable, pRow->pObj); int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); @@ -543,23 +543,23 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { atomic_sub_fetch_32(&pTable->numOfRows, 1); sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); - sdbDecRef(pTable, pWrite->pRow); + sdbDecRef(pTable, pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) { +static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); - (*pTable->fpUpdate)(pWrite); + (*pTable->fpUpdate)(pRow); return TSDB_CODE_SUCCESS; } static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { - SSWriteMsg *pWrite = wparam; + SSdbRow *pRow = wparam; SWalHead *pHead = hparam; int32_t tableId = pHead->msgType / 10; int32_t action = pHead->msgType % 10; @@ -598,22 +598,22 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus pthread_mutex_unlock(&tsSdbMgmt.mutex); - // from app, wmsg is created - if (pWrite != NULL) { + // from app, row is created + if (pRow != NULL) { // forward to peers - pWrite->processedCount = 0; - int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC); - if (syncCode <= 0) pWrite->processedCount = 1; + pRow->processedCount = 0; + int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); + if (syncCode <= 0) pRow->processedCount = 1; if (syncCode < 0) { sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name, - tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg); } else if (syncCode > 0) { sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name, - actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg); } else { sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name, - actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg); } return syncCode; } @@ -622,71 +622,71 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync - syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC); + syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); - // from wal or forward msg, wmsg not created, should add into hash + // from wal or forward msg, row not created, should add into hash if (action == SDB_ACTION_INSERT) { - SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; - code = (*pTable->fpDecode)(&wmsg); - return sdbInsertHash(pTable, &wmsg); + SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + code = (*pTable->fpDecode)(&row); + return sdbInsertHash(pTable, &row); } else if (action == SDB_ACTION_DELETE) { - void *pRow = sdbGetRowMeta(pTable, pHead->cont); - if (pRow == NULL) { + void *pObj = sdbGetRowMeta(pTable, pHead->cont); + if (pObj == NULL) { sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name, sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } - SSWriteMsg wmsg = {.pTable = pTable, .pRow = pRow}; - return sdbDeleteHash(pTable, &wmsg); + SSdbRow row = {.pTable = pTable, .pObj = pObj}; + return sdbDeleteHash(pTable, &row); } else if (action == SDB_ACTION_UPDATE) { - void *pRow = sdbGetRowMeta(pTable, pHead->cont); - if (pRow == NULL) { + void *pObj = sdbGetRowMeta(pTable, pHead->cont); + if (pObj == NULL) { sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name, sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } - SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; - code = (*pTable->fpDecode)(&wmsg); - return sdbUpdateHash(pTable, &wmsg); + SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + code = (*pTable->fpDecode)(&row); + return sdbUpdateHash(pTable, &row); } else { return TSDB_CODE_MND_INVALID_MSG_TYPE; } } -int32_t sdbInsertRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; +int32_t sdbInsertRow(SSdbRow *pRow) { + SSdbTable *pTable = pRow->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - if (sdbGetRowFromObj(pTable, pWrite->pRow)) { - sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pWrite->pRow)); - sdbDecRef(pTable, pWrite->pRow); + if (sdbGetRowFromObj(pTable, pRow->pObj)) { + sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pRow->pObj)); + sdbDecRef(pTable, pRow->pObj); return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; } if (pTable->keyType == SDB_KEY_AUTO) { - *((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1); + *((uint32_t *)pRow->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1); // let vgId increase from 2 if (pTable->autoIndex == 1 && pTable->id == SDB_TABLE_VGROUP) { - *((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1); + *((uint32_t *)pRow->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1); } } - int32_t code = sdbInsertHash(pTable, pWrite); + int32_t code = sdbInsertHash(pTable, pRow); if (code != TSDB_CODE_SUCCESS) { - sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pWrite->pRow)); + sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pRow->pObj)); return code; } // just insert data into memory - if (pWrite->type != SDB_OPER_GLOBAL) { + if (pRow->type != SDB_OPER_GLOBAL) { return TSDB_CODE_SUCCESS; } - if (pWrite->fpReq) { - return (*pWrite->fpReq)(pWrite->pMsg); + if (pRow->fpReq) { + return (*pRow->fpReq)(pRow->pMsg); } else { - return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); + return sdbWriteRowToQueue(pRow, SDB_ACTION_INSERT); } } @@ -698,59 +698,59 @@ bool sdbCheckRowDeleted(void *tparam, void *pRow) { return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1; } -int32_t sdbDeleteRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; +int32_t sdbDeleteRow(SSdbRow *pRow) { + SSdbTable *pTable = pRow->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pRow); - if (pRow == NULL) { + void *pObj = sdbGetRowMetaFromObj(pTable, pRow->pObj); + if (pObj == NULL) { sdbDebug("vgId:1, sdb:%s, record is not there, delete failed", pTable->name); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - int32_t code = sdbDeleteHash(pTable, pWrite); + int32_t code = sdbDeleteHash(pTable, pRow); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->name); return code; } // just delete data from memory - if (pWrite->type != SDB_OPER_GLOBAL) { + if (pRow->type != SDB_OPER_GLOBAL) { return TSDB_CODE_SUCCESS; } - if (pWrite->fpReq) { - return (*pWrite->fpReq)(pWrite->pMsg); + if (pRow->fpReq) { + return (*pRow->fpReq)(pRow->pMsg); } else { - return sdbWriteRowToQueue(pWrite, SDB_ACTION_DELETE); + return sdbWriteRowToQueue(pRow, SDB_ACTION_DELETE); } } -int32_t sdbUpdateRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; +int32_t sdbUpdateRow(SSdbRow *pRow) { + SSdbTable *pTable = pRow->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pRow); - if (pRow == NULL) { + void *pObj = sdbGetRowMetaFromObj(pTable, pRow->pObj); + if (pObj == NULL) { sdbDebug("vgId:1, sdb:%s, record is not there, update failed", pTable->name); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - int32_t code = sdbUpdateHash(pTable, pWrite); + int32_t code = sdbUpdateHash(pTable, pRow); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, sdb:%s, failed to update hash", pTable->name); return code; } // just update data in memory - if (pWrite->type != SDB_OPER_GLOBAL) { + if (pRow->type != SDB_OPER_GLOBAL) { return TSDB_CODE_SUCCESS; } - if (pWrite->fpReq) { - return (*pWrite->fpReq)(pWrite->pMsg); + if (pRow->fpReq) { + return (*pRow->fpReq)(pRow->pMsg); } else { - return sdbWriteRowToQueue(pWrite, SDB_ACTION_UPDATE); + return sdbWriteRowToQueue(pRow, SDB_ACTION_UPDATE); } } @@ -830,12 +830,12 @@ void sdbCloseTable(void *handle) { void **ppRow = taosHashIterGet(pIter); if (ppRow == NULL) continue; - SSWriteMsg wmsg = { - .pRow = *ppRow, + SSdbRow row = { + .pObj = *ppRow, .pTable = pTable, }; - (*pTable->fpDestroy)(&wmsg); + (*pTable->fpDestroy)(&row); } taosHashDestroyIter(pIter); @@ -934,12 +934,12 @@ static void sdbFreeQueue() { tsSdbWQueue = NULL; } -static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) { - SWalHead *pHead = pWrite->pHead; +static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) { + SWalHead *pHead = pRow->pHead; if (pHead->len > TSDB_MAX_WAL_SIZE) { sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version); - taosFreeQitem(pWrite); + taosFreeQitem(pRow); return TSDB_CODE_WAL_SIZE_LIMIT; } @@ -949,64 +949,64 @@ static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) { taosMsleep(1); } - sdbIncRef(pWrite->pTable, pWrite->pRow); + sdbIncRef(pRow->pTable, pRow->pObj); - sdbTrace("vgId:1, msg:%p write into to sdb queue", pWrite->pMsg); - taosWriteQitem(tsSdbWQueue, qtype, pWrite); + sdbTrace("vgId:1, msg:%p write into to sdb queue", pRow->pMsg); + taosWriteQitem(tsSdbWQueue, qtype, pRow); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static void sdbFreeFromQueue(SSWriteMsg *pWrite) { +static void sdbFreeFromQueue(SSdbRow *pRow) { int32_t queued = atomic_sub_fetch_32(&tsSdbMgmt.queuedMsg, 1); - sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pWrite->pMsg, queued); + sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pRow->pMsg, queued); - sdbDecRef(pWrite->pTable, pWrite->pRow); - taosFreeQitem(pWrite); + sdbDecRef(pRow->pTable, pRow->pObj); + taosFreeQitem(pRow); } static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SWalHead *pHead = wparam; - int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pHead->len; - SSWriteMsg *pWrite = taosAllocateQitem(size); - if (pWrite == NULL) { + int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len; + SSdbRow *pRow = taosAllocateQitem(size); + if (pRow == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; } - return sdbWriteToQueue(pWrite, qtype); + return sdbWriteToQueue(pRow, qtype); } -static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action) { - SSdbTable *pTable = pInputWrite->pTable; +static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) { + SSdbTable *pTable = pInputRow->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize; - SSWriteMsg *pWrite = taosAllocateQitem(size); - if (pWrite == NULL) { + int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pTable->maxRowSize; + SSdbRow *pRow = taosAllocateQitem(size); + if (pRow == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; } - memcpy(pWrite, pInputWrite, sizeof(SSWriteMsg)); - pWrite->processedCount = 1; + memcpy(pRow, pInputRow, sizeof(SSdbRow)); + pRow->processedCount = 1; - SWalHead *pHead = pWrite->pHead; - pWrite->rowData = pHead->cont; - (*pTable->fpEncode)(pWrite); + SWalHead *pHead = pRow->pHead; + pRow->rowData = pHead->cont; + (*pTable->fpEncode)(pRow); - pHead->len = pWrite->rowSize; + pHead->len = pRow->rowSize; pHead->version = 0; pHead->msgType = pTable->id * 10 + action; - return sdbWriteToQueue(pWrite, TAOS_QTYPE_RPC); + return sdbWriteToQueue(pRow, TAOS_QTYPE_RPC); } -int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite) { return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); } +int32_t sdbInsertRowToQueue(SSdbRow *pRow) { return sdbWriteRowToQueue(pRow, SDB_ACTION_INSERT); } static void *sdbWorkerFp(void *pWorker) { - SSWriteMsg *pWrite; - int32_t qtype; - void * unUsed; + SSdbRow *pRow; + int32_t qtype; + void * unUsed; while (1) { int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); @@ -1016,14 +1016,14 @@ static void *sdbWorkerFp(void *pWorker) { } for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite); - sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pWrite->pMsg, pWrite->pRow, - pWrite->pHead->version); + taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); + sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pRow->pMsg, pRow->pObj, + pRow->pHead->version); - pWrite->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pWrite : NULL, pWrite->pHead, qtype, NULL); - if (pWrite->code > 0) pWrite->code = 0; + pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, pRow->pHead, qtype, NULL); + if (pRow->code > 0) pRow->code = 0; - sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pWrite->pMsg, pWrite->code); + sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pRow->pMsg, pRow->code); } walFsync(tsSdbMgmt.wal, true); @@ -1031,16 +1031,16 @@ static void *sdbWorkerFp(void *pWorker) { // browse all items, and process them one by one taosResetQitems(tsSdbWQall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite); + taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow); if (qtype == TAOS_QTYPE_RPC) { - sdbConfirmForward(NULL, pWrite, pWrite->code); + sdbConfirmForward(NULL, pRow, pRow->code); } else if (qtype == TAOS_QTYPE_FWD) { - syncConfirmForward(tsSdbMgmt.sync, pWrite->pHead->version, pWrite->code); + syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code); } else { } - sdbFreeFromQueue(pWrite); + sdbFreeFromQueue(pRow); } } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 587c766e44..29957f8ec0 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -99,13 +99,13 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) { tfree(pTable); } -static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pWMsg) { - mnodeDestroyChildTable(pWMsg->pRow); +static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) { + mnodeDestroyChildTable(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) { - SCTableObj *pTable = pWMsg->pRow; +static int32_t mnodeChildTableActionInsert(SSdbRow *pRow) { + SCTableObj *pTable = pRow->pObj; SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -153,8 +153,8 @@ static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) { - SCTableObj *pTable = pWMsg->pRow; +static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) { + SCTableObj *pTable = pRow->pObj; if (pTable->vgId == 0) { return TSDB_CODE_MND_VGROUP_NOT_EXIST; } @@ -189,8 +189,8 @@ static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) { - SCTableObj *pNew = pWMsg->pRow; +static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) { + SCTableObj *pNew = pRow->pObj; SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); if (pTable != pNew) { void *oldTableId = pTable->info.tableId; @@ -216,50 +216,50 @@ static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionEncode(SSWriteMsg *pWMsg) { - SCTableObj *pTable = pWMsg->pRow; - assert(pTable != NULL && pWMsg->rowData != NULL); +static int32_t mnodeChildTableActionEncode(SSdbRow *pRow) { + SCTableObj *pTable = pRow->pObj; + assert(pTable != NULL && pRow->rowData != NULL); int32_t len = strlen(pTable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID; - memcpy(pWMsg->rowData, pTable->info.tableId, len); - memset(pWMsg->rowData + len, 0, 1); + memcpy(pRow->rowData, pTable->info.tableId, len); + memset(pRow->rowData + len, 0, 1); len++; - memcpy(pWMsg->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); + memcpy(pRow->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - memcpy(pWMsg->rowData + len, pTable->schema, schemaSize); + memcpy(pRow->rowData + len, pTable->schema, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { - memcpy(pWMsg->rowData + len, pTable->sql, pTable->sqlLen); + memcpy(pRow->rowData + len, pTable->sql, pTable->sqlLen); len += pTable->sqlLen; } } - pWMsg->rowSize = len; + pRow->rowSize = len; return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) { - assert(pWMsg->rowData != NULL); +static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) { + assert(pRow->rowData != NULL); SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pWMsg->rowData); + int32_t len = strlen(pRow->rowData); if (len >= TSDB_TABLE_FNAME_LEN) { free(pTable); return TSDB_CODE_MND_INVALID_TABLE_ID; } - pTable->info.tableId = strdup(pWMsg->rowData); + pTable->info.tableId = strdup(pRow->rowData); len++; - memcpy((char*)pTable + sizeof(char *), pWMsg->rowData + len, tsChildTableUpdateSize); + memcpy((char*)pTable + sizeof(char *), pRow->rowData + len, tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { @@ -269,7 +269,7 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_INVALID_TABLE_TYPE; } - memcpy(pTable->schema, pWMsg->rowData + len, schemaSize); + memcpy(pTable->schema, pRow->rowData + len, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { @@ -278,11 +278,11 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_OUT_OF_MEMORY; } - memcpy(pTable->sql, pWMsg->rowData + len, pTable->sqlLen); + memcpy(pTable->sql, pRow->rowData + len, pTable->sqlLen); } } - pWMsg->pRow = pTable; + pRow->pObj = pTable; return TSDB_CODE_SUCCESS; } @@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() { SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId); if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) { mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId); - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb}; + SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); mnodeDecDbRef(pDb); @@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() { if (pVgroup == NULL) { mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb}; + SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() { mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb}; + SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() { if (pSuperTable == NULL) { mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb}; + SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -430,13 +430,13 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) { tfree(pStable); } -static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pWMsg) { - mnodeDestroySuperTable(pWMsg->pRow); +static int32_t mnodeSuperTableActionDestroy(SSdbRow *pRow) { + mnodeDestroySuperTable(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) { - SSTableObj *pStable = pWMsg->pRow; +static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) { + SSTableObj *pStable = pRow->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) { mnodeAddSuperTableIntoDb(pDb); @@ -446,8 +446,8 @@ static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) { - SSTableObj *pStable = pWMsg->pRow; +static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) { + SSTableObj *pStable = pRow->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL) { mnodeRemoveSuperTableFromDb(pDb); @@ -458,8 +458,8 @@ static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) { - SSTableObj *pNew = pWMsg->pRow; +static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { + SSTableObj *pNew = pRow->pObj; SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); if (pTable != NULL && pTable != pNew) { void *oldTableId = pTable->info.tableId; @@ -483,43 +483,43 @@ static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pWMsg) { - SSTableObj *pStable = pWMsg->pRow; - assert(pWMsg->pRow != NULL && pWMsg->rowData != NULL); +static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) { + SSTableObj *pStable = pRow->pObj; + assert(pRow->pObj != NULL && pRow->rowData != NULL); int32_t len = strlen(pStable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID; - memcpy(pWMsg->rowData, pStable->info.tableId, len); - memset(pWMsg->rowData + len, 0, 1); + memcpy(pRow->rowData, pStable->info.tableId, len); + memset(pRow->rowData + len, 0, 1); len++; - memcpy(pWMsg->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); + memcpy(pRow->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); - memcpy(pWMsg->rowData + len, pStable->schema, schemaSize); + memcpy(pRow->rowData + len, pStable->schema, schemaSize); len += schemaSize; - pWMsg->rowSize = len; + pRow->rowSize = len; return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) { - assert(pWMsg->rowData != NULL); +static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { + assert(pRow->rowData != NULL); SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pWMsg->rowData); + int32_t len = strlen(pRow->rowData); if (len >= TSDB_TABLE_FNAME_LEN){ free(pStable); return TSDB_CODE_MND_INVALID_TABLE_ID; } - pStable->info.tableId = strdup(pWMsg->rowData); + pStable->info.tableId = strdup(pRow->rowData); len++; - memcpy((char*)pStable + sizeof(char *), pWMsg->rowData + len, tsSuperTableUpdateSize); + memcpy((char*)pStable + sizeof(char *), pRow->rowData + len, tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); @@ -529,9 +529,9 @@ static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) { return TSDB_CODE_MND_NOT_SUPER_TABLE; } - memcpy(pStable->schema, pWMsg->rowData + len, schemaSize); + memcpy(pStable->schema, pRow->rowData + len, schemaSize); - pWMsg->pRow = pStable; + pRow->pObj = pStable; return TSDB_CODE_SUCCESS; } @@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { } else { mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pTable, .pTable = tsSuperTableSdb}; + SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb}; sdbDeleteRow(&desc); } @@ -878,16 +878,16 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { pMsg->pTable = (STableObj *)pStable; mnodeIncTableRef(pMsg->pTable); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .rowSize = sizeof(SSTableObj) + schemaSize, .pMsg = pMsg, .fpRsp = mnodeCreateSuperTableCb }; - int32_t code = sdbInsertRow(&wmsg); + int32_t code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeDestroySuperTable(pStable); pMsg->pTable = NULL; @@ -937,15 +937,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { mnodeDropAllChildTablesInStable(pStable); } - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeDropSuperTableCb }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); @@ -1010,15 +1010,15 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, schema[0].name); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeAddSuperTableTagCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { @@ -1044,15 +1044,15 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeDropSuperTableTagCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { @@ -1088,15 +1088,15 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, oldTagName, newTagName); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeModifySuperTableTagNameCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) { @@ -1162,15 +1162,15 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeAddSuperTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1207,15 +1207,15 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeDropSuperTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1251,15 +1251,15 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, oldName, newName); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsSuperTableSdb, - .pRow = pStable, + .pObj = pStable, .pMsg = pMsg, .fpRsp = mnodeChangeSuperTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } // show super tables @@ -1417,12 +1417,12 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsSuperTableSdb, - .pRow = pTable, + .pObj = pTable, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfTables ++; } @@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { } else { mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pTable, .pTable = tsChildTableSdb}; + SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); return code; } @@ -1780,9 +1780,9 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { pMsg->pTable = (STableObj *)pTable; mnodeIncTableRef(pMsg->pTable); - SSWriteMsg desc = { + SSdbRow desc = { .type = SDB_OPER_GLOBAL, - .pRow = pTable, + .pObj = pTable, .pTable = tsChildTableSdb, .pMsg = pMsg, .fpReq = mnodeDoCreateChildTableFp @@ -1901,15 +1901,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_APP_ERROR; } - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeDropChildTableCb }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); @@ -2005,15 +2005,15 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { @@ -2038,15 +2038,15 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { @@ -2075,15 +2075,15 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, oldName, newName); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&wmsg); + return sdbUpdateRow(&row); } static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) { @@ -2218,12 +2218,12 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { if (pTable == NULL) break; if (pTable->vgId == pVgroup->vgId) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfTables++; } mnodeDecTableRef(pTable); @@ -2251,12 +2251,12 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfTables++; } mnodeDecTableRef(pTable); @@ -2280,12 +2280,12 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) { if (pTable == NULL) break; if (pTable->superTable == pStable) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsChildTableSdb, - .pRow = pTable, + .pObj = pTable, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfTables++; } @@ -2410,9 +2410,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { } if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - SSWriteMsg desc = { + SSdbRow desc = { .type = SDB_OPER_GLOBAL, - .pRow = pTable, + .pObj = pTable, .pTable = tsChildTableSdb, .pMsg = mnodeMsg, .fpRsp = mnodeDoCreateChildTableCb @@ -2440,8 +2440,8 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry); - SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pRow = pTable}; - sdbDeleteRow(&wmsg); + SSdbRow row = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable}; + sdbDeleteRow(&row); if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) { //Avoid retry again in client diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 9de14dbfd1..dc76d92eb8 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg); -static int32_t mnodeUserActionDestroy(SSWriteMsg *pWMsg) { - tfree(pWMsg->pRow); +static int32_t mnodeUserActionDestroy(SSdbRow *pRow) { + tfree(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) { - SUserObj *pUser = pWMsg->pRow; +static int32_t mnodeUserActionInsert(SSdbRow *pRow) { + SUserObj *pUser = pRow->pObj; SAcctObj *pAcct = mnodeGetAcct(pUser->acct); if (pAcct != NULL) { @@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) { - SUserObj *pUser = pWMsg->pRow; +static int32_t mnodeUserActionDelete(SSdbRow *pRow) { + SUserObj *pUser = pRow->pObj; SAcctObj *pAcct = mnodeGetAcct(pUser->acct); if (pAcct != NULL) { @@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) { - SUserObj *pUser = pWMsg->pRow; +static int32_t mnodeUserActionUpdate(SSdbRow *pRow) { + SUserObj *pUser = pRow->pObj; SUserObj *pSaved = mnodeGetUser(pUser->user); if (pUser != pSaved) { memcpy(pSaved, pUser, tsUserUpdateSize); @@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionEncode(SSWriteMsg *pWMsg) { - SUserObj *pUser = pWMsg->pRow; - memcpy(pWMsg->rowData, pUser, tsUserUpdateSize); - pWMsg->rowSize = tsUserUpdateSize; +static int32_t mnodeUserActionEncode(SSdbRow *pRow) { + SUserObj *pUser = pRow->pObj; + memcpy(pRow->rowData, pUser, tsUserUpdateSize); + pRow->rowSize = tsUserUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeUserActionDecode(SSdbRow *pRow) { SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj)); if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pUser, pWMsg->rowData, tsUserUpdateSize); - pWMsg->pRow = pUser; + memcpy(pUser, pRow->rowData, tsUserUpdateSize); + pRow->pObj = pUser; return TSDB_CODE_SUCCESS; } @@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) { } static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsUserSdb, - .pRow = pUser, + .pObj = pUser, .pMsg = pMsg }; - int32_t code = sdbUpdateRow(&wmsg); + int32_t code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); } else { @@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { pUser->superAuth = 1; } - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsUserSdb, - .pRow = pUser, + .pObj = pUser, .rowSize = sizeof(SUserObj), .pMsg = pMsg }; - code = sdbInsertRow(&wmsg); + code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); tfree(pUser); @@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { } static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsUserSdb, - .pRow = pUser, + .pObj = pUser, .pMsg = pMsg }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); } else { @@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { if (pUser == NULL) break; if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsUserSdb, - .pRow = pUser, + .pObj = pUser, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfUsers++; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 2eb11e1def..16e5a601e3 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) { tfree(pVgroup); } -static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pWMsg) { - mnodeDestroyVgroup(pWMsg->pRow); +static int32_t mnodeVgroupActionDestroy(SSdbRow *pRow) { + mnodeDestroyVgroup(pRow->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) { - SVgObj *pVgroup = pWMsg->pRow; +static int32_t mnodeVgroupActionInsert(SSdbRow *pRow) { + SVgObj *pVgroup = pRow->pObj; // refer to db SDbObj *pDb = mnodeGetDb(pVgroup->dbName); @@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) { - SVgObj *pVgroup = pWMsg->pRow; +static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) { + SVgObj *pVgroup = pRow->pObj; if (pVgroup->pDb == NULL) { mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); @@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) { - SVgObj *pNew = pWMsg->pRow; +static int32_t mnodeVgroupActionUpdate(SSdbRow *pRow) { + SVgObj *pNew = pRow->pObj; SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId); if (pVgroup != pNew) { @@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionEncode(SSWriteMsg *pWMsg) { - SVgObj *pVgroup = pWMsg->pRow; - memcpy(pWMsg->rowData, pVgroup, tsVgUpdateSize); - SVgObj *pTmpVgroup = pWMsg->rowData; +static int32_t mnodeVgroupActionEncode(SSdbRow *pRow) { + SVgObj *pVgroup = pRow->pObj; + memcpy(pRow->rowData, pVgroup, tsVgUpdateSize); + SVgObj *pTmpVgroup = pRow->rowData; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { pTmpVgroup->vnodeGid[i].pDnode = NULL; pTmpVgroup->vnodeGid[i].role = 0; } - pWMsg->rowSize = tsVgUpdateSize; + pRow->rowSize = tsVgUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionDecode(SSWriteMsg *pWMsg) { +static int32_t mnodeVgroupActionDecode(SSdbRow *pRow) { SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj)); if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pVgroup, pWMsg->rowData, tsVgUpdateSize); - pWMsg->pRow = pVgroup; + memcpy(pVgroup, pRow->rowData, tsVgUpdateSize); + pRow->pObj = pVgroup; return TSDB_CODE_SUCCESS; } @@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) { } void mnodeUpdateVgroup(SVgObj *pVgroup) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup + .pObj = pVgroup }; - int32_t code = sdbUpdateRow(&wmsg); + int32_t code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } @@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb}; + SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; sdbDeleteRow(&desc); return code; } else { mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); pVgroup->status = TAOS_VG_STATUS_READY; - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb}; + SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; (void)sdbUpdateRow(&desc); dnodeReprocessMWriteMsg(pMsg); @@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { // mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, // pDb->name, pVgroup->numOfVnodes); // pVgroup->status = TAOS_VG_STATUS_READY; - // SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb}; + // SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; // (void)sdbUpdateRow(&desc); // dnodeReprocessMWriteMsg(pMsg); // return TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { pMsg->pVgroup = pVgroup; mnodeIncVgroupRef(pVgroup); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup, + .pObj = pVgroup, .rowSize = sizeof(SVgObj), .pMsg = pMsg, .fpReq = mnodeCreateVgroupFp }; - code = sdbInsertRow(&wmsg); + code = sdbInsertRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { pMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); @@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { } else { mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mnodeSendDropVgroupMsg(pVgroup, NULL); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup + .pObj = pVgroup }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); } } @@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received == mnodeMsg->successed) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup, + .pObj = pVgroup, .rowSize = sizeof(SVgObj), .pMsg = mnodeMsg, .fpRsp = mnodeCreateVgroupCb }; - int32_t code = sdbInsertRowToQueue(&wmsg); + int32_t code = sdbInsertRowToQueue(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); dnodeSendRpcMWriteRsp(mnodeMsg, code); } } else { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup + .pObj = pVgroup }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code); } } @@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_GLOBAL, .pTable = tsVgroupSdb, - .pRow = pVgroup + .pObj = pVgroup }; - int32_t code = sdbDeleteRow(&wmsg); + int32_t code = sdbDeleteRow(&row); if (code != 0) { code = TSDB_CODE_MND_SDB_ERROR; } @@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) { if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) { mnodeDropAllChildTablesInVgroups(pVgroup); - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsVgroupSdb, - .pRow = pVgroup, + .pObj = pVgroup, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfVgroups++; } mnodeDecVgroupRef(pVgroup); @@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) { if (pVgroup == NULL) break; if (pVgroup->pDb == pDropDb) { - SSWriteMsg wmsg = { + SSdbRow row = { .type = SDB_OPER_LOCAL, .pTable = tsVgroupSdb, - .pRow = pVgroup, + .pObj = pVgroup, }; - sdbDeleteRow(&wmsg); + sdbDeleteRow(&row); numOfVgroups++; } -- GitLab