diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 938a8a40fd26afbbcfb77b5fdbe9937c16d88642..0b0933c08acf1cda585669304f8fb43291dfff54 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -20,7 +20,9 @@ extern "C" { #endif -struct SMnodeMsg; +#include "mnode.h" + +struct SSdbTable; typedef enum { SDB_TABLE_CLUSTER = 0, @@ -36,35 +38,35 @@ typedef enum { } ESdbTable; typedef enum { - SDB_KEY_STRING, - SDB_KEY_INT, - SDB_KEY_AUTO, - SDB_KEY_VAR_STRING, + SDB_KEY_STRING = 0, + SDB_KEY_INT = 1, + SDB_KEY_AUTO = 2, + SDB_KEY_VAR_STRING = 3, } ESdbKey; typedef enum { - SDB_OPER_GLOBAL, - SDB_OPER_LOCAL + SDB_OPER_GLOBAL = 0, + SDB_OPER_LOCAL = 1 } ESdbOper; typedef struct SSWriteMsg { ESdbOper type; + int32_t processedCount; // for sync fwd callback + int32_t retCode; // for callback in sdb queue int32_t rowSize; - int32_t retCode; // for callback in sdb queue - int32_t processedCount; // for sync fwd callback - int32_t (*reqFp)(struct SMnodeMsg *pMsg); - int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code); - void * table; - void * pObj; void * rowData; - struct SMnodeMsg *pMsg; + int32_t (*fpReq)(SMnodeMsg *pMsg); + int32_t (*fpWrite)(SMnodeMsg *pMsg, int32_t code); + void * pObj; + SMnodeMsg *pMsg; + struct SSdbTable *pTable; } SSWriteMsg; typedef struct { - char *tableName; - int32_t hashSessions; - int32_t maxRowSize; - int32_t refCountPos; + char * tableName; + int32_t hashSessions; + int32_t maxRowSize; + int32_t refCountPos; ESdbTable tableId; ESdbKey keyType; int32_t (*fpInsert)(SSWriteMsg *pWrite); @@ -89,15 +91,15 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite); int32_t sdbUpdateRow(SSWriteMsg *pWrite); int32_t sdbInsertRowImp(SSWriteMsg *pWrite); -void *sdbGetRow(void *handle, void *key); -void *sdbFetchRow(void *handle, void *pIter, void **ppRow); +void *sdbGetRow(void *pTable, void *key); +void *sdbFetchRow(void *pTable, void *pIter, void **ppRow); void sdbFreeIter(void *pIter); -void sdbIncRef(void *thandle, void *pRow); -void sdbDecRef(void *thandle, void *pRow); -int64_t sdbGetNumOfRows(void *handle); -int32_t sdbGetId(void *handle); +void sdbIncRef(void *pTable, void *pRow); +void sdbDecRef(void *pTable, void *pRow); +int64_t sdbGetNumOfRows(void *pTable); +int32_t sdbGetId(void *pTable); uint64_t sdbGetVersion(); -bool sdbCheckRowDeleted(void *thandle, void *pRow); +bool sdbCheckRowDeleted(void *pTable, void *pRow); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index c6d366ebb3685e2d84dde55301a83d662738140e..697ff1d5a1c0f9ee2b366fc1f5025b92c35b411b 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -31,30 +31,30 @@ void * tsAcctSdb = NULL; static int32_t tsAcctUpdateSize; static int32_t mnodeCreateRootAcct(); -static int32_t mnodeAcctActionDestroy(SSWriteMsg *pOper) { - SAcctObj *pAcct = pOper->pObj; +static int32_t mnodeAcctActionDestroy(SSWriteMsg *pWMsg) { + SAcctObj *pAcct = pWMsg->pObj; pthread_mutex_destroy(&pAcct->mutex); - tfree(pOper->pObj); + tfree(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionInsert(SSWriteMsg *pOper) { - SAcctObj *pAcct = pOper->pObj; +static int32_t mnodeAcctActionInsert(SSWriteMsg *pWMsg) { + SAcctObj *pAcct = pWMsg->pObj; memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo)); pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS; pthread_mutex_init(&pAcct->mutex, NULL); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionDelete(SSWriteMsg *pOper) { - SAcctObj *pAcct = pOper->pObj; +static int32_t mnodeAcctActionDelete(SSWriteMsg *pWMsg) { + SAcctObj *pAcct = pWMsg->pObj; mnodeDropAllUsers(pAcct); mnodeDropAllDbs(pAcct); return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionUpdate(SSWriteMsg *pOper) { - SAcctObj *pAcct = pOper->pObj; +static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) { + SAcctObj *pAcct = pWMsg->pObj; SAcctObj *pSaved = mnodeGetAcct(pAcct->user); if (pAcct != pSaved) { memcpy(pSaved, pAcct, tsAcctUpdateSize); @@ -64,19 +64,19 @@ static int32_t mnodeAcctActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionEncode(SSWriteMsg *pOper) { - SAcctObj *pAcct = pOper->pObj; - memcpy(pOper->rowData, pAcct, tsAcctUpdateSize); - pOper->rowSize = tsAcctUpdateSize; +static int32_t mnodeAcctActionEncode(SSWriteMsg *pWMsg) { + SAcctObj *pAcct = pWMsg->pObj; + memcpy(pWMsg->rowData, pAcct, tsAcctUpdateSize); + pWMsg->rowSize = tsAcctUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeAcctActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeAcctActionDecode(SSWriteMsg *pWMsg) { SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj)); if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pAcct, pOper->rowData, tsAcctUpdateSize); - pOper->pObj = pAcct; + memcpy(pAcct, pWMsg->rowData, tsAcctUpdateSize); + pWMsg->pObj = pAcct; return TSDB_CODE_SUCCESS; } @@ -226,13 +226,13 @@ static int32_t mnodeCreateRootAcct() { pAcct->acctId = sdbGetId(tsAcctSdb); pAcct->createdTime = taosGetTimestampMs(); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsAcctSdb, - .pObj = pAcct, + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsAcctSdb, + .pObj = pAcct, }; - return sdbInsertRow(&oper); + return sdbInsertRow(&wmsg); } #ifndef _ACCT diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 3b2b668a1f8d6623cbd3f3616138e145b826e5ae..c94c8332706d45cd346cc6f18202dcc7d9c89b55 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster(); static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeClusterActionDestroy(SSWriteMsg *pOper) { - tfree(pOper->pObj); +static int32_t mnodeClusterActionDestroy(SSWriteMsg *pWMsg) { + tfree(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionInsert(SSWriteMsg *pOper) { +static int32_t mnodeClusterActionInsert(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionDelete(SSWriteMsg *pOper) { +static int32_t mnodeClusterActionDelete(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionUpdate(SSWriteMsg *pOper) { +static int32_t mnodeClusterActionUpdate(SSWriteMsg *pWMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionEncode(SSWriteMsg *pOper) { - SClusterObj *pCluster = pOper->pObj; - memcpy(pOper->rowData, pCluster, tsClusterUpdateSize); - pOper->rowSize = tsClusterUpdateSize; +static int32_t mnodeClusterActionEncode(SSWriteMsg *pWMsg) { + SClusterObj *pCluster = pWMsg->pObj; + memcpy(pWMsg->rowData, pCluster, tsClusterUpdateSize); + pWMsg->rowSize = tsClusterUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeClusterActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeClusterActionDecode(SSWriteMsg *pWMsg) { SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj)); if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pCluster, pOper->rowData, tsClusterUpdateSize); - pOper->pObj = pCluster; + memcpy(pCluster, pWMsg->rowData, tsClusterUpdateSize); + pWMsg->pObj = pCluster; return TSDB_CODE_SUCCESS; } @@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() { mDebug("uid is %s", pCluster->uid); } - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsClusterSdb, - .pObj = pCluster, + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsClusterSdb, + .pObj = pCluster, }; - return sdbInsertRow(&oper); + return sdbInsertRow(&wmsg); } const char* mnodeGetClusterId() { diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 403213c22f16936c1e5f652e818a20f757ba7c82..3c2bfbb8341a6998644d29008ea4aa31f7e62aa7 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) { tfree(pDb); } -static int32_t mnodeDbActionDestroy(SSWriteMsg *pOper) { - mnodeDestroyDb(pOper->pObj); +static int32_t mnodeDbActionDestroy(SSWriteMsg *pWMsg) { + mnodeDestroyDb(pWMsg->pObj); return TSDB_CODE_SUCCESS; } @@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() { return sdbGetNumOfRows(tsDbSdb); } -static int32_t mnodeDbActionInsert(SSWriteMsg *pOper) { - SDbObj *pDb = pOper->pObj; +static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) { + SDbObj *pDb = pWMsg->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); pthread_mutex_init(&pDb->mutex, NULL); @@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionDelete(SSWriteMsg *pOper) { - SDbObj *pDb = pOper->pObj; +static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) { + SDbObj *pDb = pWMsg->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); mnodeDropAllChildTables(pDb); @@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionUpdate(SSWriteMsg *pOper) { - SDbObj *pNew = pOper->pObj; +static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) { + SDbObj *pNew = pWMsg->pObj; SDbObj *pDb = mnodeGetDb(pNew->name); if (pDb != NULL && pNew != pDb) { - memcpy(pDb, pNew, pOper->rowSize); + memcpy(pDb, pNew, pWMsg->rowSize); free(pNew->vgList); free(pNew); } @@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionEncode(SSWriteMsg *pOper) { - SDbObj *pDb = pOper->pObj; - memcpy(pOper->rowData, pDb, tsDbUpdateSize); - pOper->rowSize = tsDbUpdateSize; +static int32_t mnodeDbActionEncode(SSWriteMsg *pWMsg) { + SDbObj *pDb = pWMsg->pObj; + memcpy(pWMsg->rowData, pDb, tsDbUpdateSize); + pWMsg->rowSize = tsDbUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeDbActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeDbActionDecode(SSWriteMsg *pWMsg) { SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pDb, pOper->rowData, tsDbUpdateSize); - pOper->pObj = pDb; + memcpy(pDb, pWMsg->rowData, tsDbUpdateSize); + pWMsg->pObj = pDb; return TSDB_CODE_SUCCESS; } @@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * pMsg->pDb = pDb; mnodeIncDbRef(pDb); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDbSdb, - .pObj = pDb, - .rowSize = sizeof(SDbObj), - .pMsg = pMsg, - .writeCb = mnodeCreateDbCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .rowSize = sizeof(SDbObj), + .pMsg = pMsg, + .fpWrite = mnodeCreateDbCb }; - code = sdbInsertRow(&oper); + code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); pMsg->pDb = NULL; @@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { } #if 0 -void mnodePrintVgroups(SDbObj *pDb, char *oper) { - mInfo("db:%s, vgroup link from head, oper:%s", pDb->name, oper); +void mnodePrintVgroups(SDbObj *pDb, char *wmsg) { + mInfo("db:%s, vgroup link from head, wmsg:%s", pDb->name, wmsg); SVgObj *pVgroup = pDb->pHead; while (pVgroup != NULL) { mInfo("vgId:%d", pVgroup->vgId); @@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { if (pDb->status) return TSDB_CODE_SUCCESS; pDb->status = true; - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDbSdb, - .pObj = pDb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb }; - int32_t code = sdbUpdateRow(&oper); + int32_t code = sdbUpdateRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code)); } @@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; pDb->cfgVersion++; - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDbSdb, - .pObj = pDb, - .pMsg = pMsg, - .writeCb = mnodeAlterDbCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + .fpWrite = mnodeAlterDbCb }; - code = sdbUpdateRow(&oper); + code = sdbUpdateRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); } @@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { SDbObj *pDb = pMsg->pDb; mInfo("db:%s, drop db from sdb", pDb->name); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDbSdb, - .pObj = pDb, - .pMsg = pMsg, - .writeCb = mnodeDropDbCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + .fpWrite = mnodeDropDbCb }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code)); } @@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { if (pDb->pAcct == pAcct) { mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user); - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsDbSdb, - .pObj = pDb + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsDbSdb, + .pObj = pDb }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfDbs++; } mnodeDecDbRef(pDb); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 0dd43fef895aab9df9b562deb5686e412e7e4820..617499967e1eb2c711095ef4ca8cdbc655e2d42d 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -87,13 +87,13 @@ static char* offlineReason[] = { "unknown", }; -static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pOper) { - tfree(pOper->pObj); +static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pWMsg) { + tfree(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionInsert(SSWriteMsg *pOper) { - SDnodeObj *pDnode = pOper->pObj; +static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) { + SDnodeObj *pDnode = pWMsg->pObj; if (pDnode->status != TAOS_DN_STATUS_DROPPING) { pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->lastAccess = tsAccessSquence; @@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionDelete(SSWriteMsg *pOper) { - SDnodeObj *pDnode = pOper->pObj; +static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) { + SDnodeObj *pDnode = pWMsg->pObj; #ifndef _SYNC mnodeDropAllDnodeVgroups(pDnode); @@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pOper) { - SDnodeObj *pNew = pOper->pObj; +static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) { + SDnodeObj *pNew = pWMsg->pObj; SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); if (pDnode != NULL && pNew != pDnode) { - memcpy(pDnode, pNew, pOper->rowSize); + memcpy(pDnode, pNew, pWMsg->rowSize); free(pNew); } mnodeDecDnodeRef(pDnode); @@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionEncode(SSWriteMsg *pOper) { - SDnodeObj *pDnode = pOper->pObj; - memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize); - pOper->rowSize = tsDnodeUpdateSize; +static int32_t mnodeDnodeActionEncode(SSWriteMsg *pWMsg) { + SDnodeObj *pDnode = pWMsg->pObj; + memcpy(pWMsg->rowData, pDnode, tsDnodeUpdateSize); + pWMsg->rowSize = tsDnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeDnodeActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeDnodeActionDecode(SSWriteMsg *pWMsg) { SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pDnode, pOper->rowData, tsDnodeUpdateSize); - pOper->pObj = pDnode; + memcpy(pDnode, pWMsg->rowData, tsDnodeUpdateSize); + pWMsg->pObj = pDnode; return TSDB_CODE_SUCCESS; } @@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) { } void mnodeUpdateDnode(SDnodeObj *pDnode) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDnodeSdb, - .pObj = pDnode + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDnodeSdb, + .pObj = pDnode }; - int32_t code = sdbUpdateRow(&oper); + int32_t code = sdbUpdateRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnodeId:%d, failed update", pDnode->dnodeId); } @@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN); taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDnodeSdb, - .pObj = pDnode, + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDnodeSdb, + .pObj = pDnode, .rowSize = sizeof(SDnodeObj), - .pMsg = pMsg + .pMsg = pMsg }; - int32_t code = sdbInsertRow(&oper); + int32_t code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { int dnodeId = pDnode->dnodeId; tfree(pDnode); @@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { } int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDnodeSdb, - .pObj = pDnode, - .pMsg = pMsg + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDnodeSdb, + .pObj = pDnode, + .pMsg = pMsg }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); } else { diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index bf9033c5cdfaa3d3dc6ec09d961a451b6b2458f0..37915319e59b90f7d63f9e3398bf8f313326b8af 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo #define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock) #endif -static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pOper) { - tfree(pOper->pObj); +static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pWMsg) { + tfree(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionInsert(SSWriteMsg *pOper) { - SMnodeObj *pMnode = pOper->pObj; +static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) { + SMnodeObj *pMnode = pWMsg->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionDelete(SSWriteMsg *pOper) { - SMnodeObj *pMnode = pOper->pObj; +static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) { + SMnodeObj *pMnode = pWMsg->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pOper) { - SMnodeObj *pMnode = pOper->pObj; +static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pWMsg) { + SMnodeObj *pMnode = pWMsg->pObj; SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId); if (pMnode != pSaved) { - memcpy(pSaved, pMnode, pOper->rowSize); + memcpy(pSaved, pMnode, pWMsg->rowSize); free(pMnode); } mnodeDecMnodeRef(pSaved); return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionEncode(SSWriteMsg *pOper) { - SMnodeObj *pMnode = pOper->pObj; - memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize); - pOper->rowSize = tsMnodeUpdateSize; +static int32_t mnodeMnodeActionEncode(SSWriteMsg *pWMsg) { + SMnodeObj *pMnode = pWMsg->pObj; + memcpy(pWMsg->rowData, pMnode, tsMnodeUpdateSize); + pWMsg->rowSize = tsMnodeUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeMnodeActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeMnodeActionDecode(SSWriteMsg *pWMsg) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize); - pOper->pObj = pMnode; + memcpy(pMnode, pWMsg->rowData, tsMnodeUpdateSize); + pWMsg->pObj = pMnode; return TSDB_CODE_SUCCESS; } @@ -329,11 +329,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsMnodeSdb, - .pObj = pMnode, - .writeCb = mnodeCreateMnodeCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsMnodeSdb, + .pObj = pMnode, + .fpWrite = mnodeCreateMnodeCb }; int32_t code = TSDB_CODE_SUCCESS; @@ -346,7 +346,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { return; } - code = sdbInsertRow(&oper); + code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code)); tfree(pMnode); @@ -356,8 +356,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { void mnodeDropMnodeLocal(int32_t dnodeId) { SMnodeObj *pMnode = mnodeGetMnode(dnodeId); if (pMnode != NULL) { - SSWriteMsg oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode}; - sdbDeleteRow(&oper); + SSWriteMsg wmsg = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pObj = pMnode}; + sdbDeleteRow(&wmsg); mnodeDecMnodeRef(pMnode); } @@ -371,13 +371,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) { return TSDB_CODE_MND_DNODE_NOT_EXIST; } - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsMnodeSdb, - .pObj = pMnode + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsMnodeSdb, + .pObj = pMnode }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); sdbDecRef(tsMnodeSdb, pMnode); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 59812068d4a8910110f7d95493ae197647f25c4e..034ff870c506d1e841c73d039d7886a4e3409343 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -83,12 +83,12 @@ typedef struct { typedef struct { pthread_t thread; int32_t workerId; -} SSWriteWorker; +} SSdbWorker; typedef struct { int32_t num; - SSWriteWorker *worker; -} SSWriteWorkerPool; + SSdbWorker *worker; +} SSdbWorkerPool; extern void * tsMnodeTmr; static void * tsSdbTmr; @@ -96,27 +96,28 @@ static SSdbObject tsSdbObj = {0}; static taos_qset tsSdbWQset; static taos_qall tsSdbWQall; static taos_queue tsSdbWQueue; -static SSWriteWorkerPool tsSdbPool; - -static int32_t sdbWrite(void *wparam, void *data, int32_t type, void *pMsg); -static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg); -static void * sdbWorkerFp(void *param); -static int32_t sdbInitWriteWorker(); -static void sdbCleanupWriteWorker(); -static int32_t sdbAllocWriteQueue(); -static void sdbFreeWritequeue(); +static SSdbWorkerPool tsSdbPool; + +static int32_t sdbWrite(void *pWrite, void *pHead, int32_t qtype, void *unused); +static int32_t sdbWriteToQueue(void *pWrite, void *pHead, int32_t qtype, void *unused); +static void * sdbWorkerFp(void *pWorker); +static int32_t sdbInitWorker(); +static void sdbCleanupWorker(); +static int32_t sdbAllocQueue(); +static void sdbFreeQueue(); +extern int32_t sdbInsertRowImp(SSWriteMsg *pWrite); static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite); static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite); static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite); -int32_t sdbGetId(void *handle) { - return ((SSdbTable *)handle)->autoIndex; +int32_t sdbGetId(void *pTable) { + return ((SSdbTable *)pTable)->autoIndex; } -int64_t sdbGetNumOfRows(void *handle) { - return ((SSdbTable *)handle)->numOfRows; +int64_t sdbGetNumOfRows(void *pTable) { + return ((SSdbTable *)pTable)->numOfRows; } uint64_t sdbGetVersion() { @@ -276,27 +277,27 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; int32_t action = pHead->msgType % 10; sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pObj, - sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode)); + sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode)); if (action == SDB_ACTION_INSERT) { // It's better to create a table in two stages, create it first and then set it success - //sdbDeleteHash(pWrite->table, pWrite); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = pWrite->table, - .pObj = pWrite->pObj + //sdbDeleteHash(pWrite->pTable, pWrite); + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = pWrite->pTable, + .pObj = pWrite->pObj }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); } } - if (pWrite->writeCb != NULL) { - pWrite->retCode = (*pWrite->writeCb)(pMsg, pWrite->retCode); + if (pWrite->fpWrite != NULL) { + pWrite->retCode = (*pWrite->fpWrite)(pMsg, pWrite->retCode); } dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode); // if ahandle, means this func is called by sdb write if (ahandle == NULL) { - sdbDecRef(pWrite->table, pWrite->pObj); + sdbDecRef(pWrite->pTable, pWrite->pObj); } taosFreeQitem(pWrite); @@ -403,7 +404,7 @@ void sdbUpdateSync(void *pMnodes) { int32_t sdbInit() { pthread_mutex_init(&tsSdbObj.mutex, NULL); - if (sdbInitWriteWorker() != 0) { + if (sdbInitWorker() != 0) { return -1; } @@ -426,7 +427,7 @@ void sdbCleanUp() { tsSdbObj.status = SDB_STATUS_CLOSING; - sdbCleanupWriteWorker(); + sdbCleanupWorker(); sdbDebug("vgId:1, sdb will be closed, mver:%" PRIu64, tsSdbObj.version); if (tsSdbObj.sync) { @@ -442,19 +443,19 @@ void sdbCleanUp() { pthread_mutex_destroy(&tsSdbObj.mutex); } -void sdbIncRef(void *handle, void *pObj) { - if (pObj == NULL || handle == NULL) return; +void sdbIncRef(void *tparam, void *pObj) { + if (pObj == NULL || tparam == NULL) return; - SSdbTable *pTable = handle; + SSdbTable *pTable = tparam; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t refCount = atomic_add_fetch_32(pRefCount, 1); sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); } -void sdbDecRef(void *handle, void *pObj) { - if (pObj == NULL || handle == NULL) return; +void sdbDecRef(void *tparam, void *pObj) { + if (pObj == NULL || tparam == NULL) return; - SSdbTable *pTable = handle; + SSdbTable *pTable = tparam; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); @@ -462,8 +463,8 @@ void sdbDecRef(void *handle, void *pObj) { int32_t *updateEnd = pObj + pTable->refCountPos - 4; if (refCount <= 0 && *updateEnd) { sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); - SSWriteMsg oper = {.pObj = pObj}; - (*pTable->fpDestroy)(&oper); + SSWriteMsg wmsg = {.pObj = pObj}; + (*pTable->fpDestroy)(&wmsg); } } @@ -485,12 +486,12 @@ static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); } -void *sdbGetRow(void *handle, void *key) { - SSdbTable *pTable = handle; +void *sdbGetRow(void *tparam, void *key) { + SSdbTable *pTable = tparam; pthread_mutex_lock(&pTable->mutex); - void *pRow = sdbGetRowMeta(handle, key); - if (pRow) sdbIncRef(handle, pRow); + void *pRow = sdbGetRowMeta(pTable, key); + if (pRow) sdbIncRef(pTable, pRow); pthread_mutex_unlock(&pTable->mutex); return pRow; @@ -573,9 +574,9 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) { return TSDB_CODE_SUCCESS; } -static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { - SSWriteMsg *pWrite = param; - SWalHead *pHead = data; +static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { + SSWriteMsg *pWrite = wparam; + SWalHead *pHead = hparam; int32_t tableId = pHead->msgType / 10; int32_t action = pHead->msgType % 10; @@ -593,12 +594,12 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { if (pHead->version <= tsSdbObj.version) { pthread_mutex_unlock(&tsSdbObj.mutex); sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbObj.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbObj.version); return TSDB_CODE_SYN_INVALID_VERSION; } else { tsSdbObj.version = pHead->version; @@ -613,7 +614,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { pthread_mutex_unlock(&tsSdbObj.mutex); - // from app, oper is created + // from app, wmsg is created if (pWrite != NULL) { // forward to peers pWrite->processedCount = 0; @@ -639,11 +640,11 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { // even it is WAL/FWD, it shall be called to update version in sync syncForwardToPeer(tsSdbObj.sync, pHead, pWrite, TAOS_QTYPE_RPC); - // from wal or forward msg, oper not created, should add into hash + // from wal or forward msg, wmsg not created, should add into hash if (action == SDB_ACTION_INSERT) { - SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; - code = (*pTable->fpDecode)(&oper); - return sdbInsertHash(pTable, &oper); + SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + code = (*pTable->fpDecode)(&wmsg); + return sdbInsertHash(pTable, &wmsg); } else if (action == SDB_ACTION_DELETE) { void *pRow = sdbGetRowMeta(pTable, pHead->cont); if (pRow == NULL) { @@ -651,8 +652,8 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } - SSWriteMsg oper = {.table = pTable, .pObj = pRow}; - return sdbDeleteHash(pTable, &oper); + SSWriteMsg wmsg = {.pTable = pTable, .pObj = pRow}; + return sdbDeleteHash(pTable, &wmsg); } else if (action == SDB_ACTION_UPDATE) { void *pRow = sdbGetRowMeta(pTable, pHead->cont); if (pRow == NULL) { @@ -660,16 +661,16 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } - SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; - code = (*pTable->fpDecode)(&oper); - return sdbUpdateHash(pTable, &oper); + SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + code = (*pTable->fpDecode)(&wmsg); + return sdbUpdateHash(pTable, &wmsg); } else { return TSDB_CODE_MND_INVALID_MSG_TYPE; } } int32_t sdbInsertRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (sdbGetRowFromObj(pTable, pWrite->pObj)) { @@ -699,15 +700,15 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { return TSDB_CODE_SUCCESS; } - if (pWrite->reqFp) { - return (*pWrite->reqFp)(pWrite->pMsg); + if (pWrite->fpReq) { + return (*pWrite->fpReq)(pWrite->pMsg); } else { return sdbInsertRowImp(pWrite); } } int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; @@ -729,14 +730,14 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj)); } - sdbIncRef(pNewOper->table, pNewOper->pObj); + sdbIncRef(pNewOper->pTable, pNewOper->pObj); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { - SSdbTable *pTable = pTableInput; +bool sdbCheckRowDeleted(void *tparam, void *pRow) { + SSdbTable *pTable = tparam; if (pTable == NULL) return false; int32_t *updateEnd = pRow + pTable->refCountPos - 4; @@ -744,7 +745,7 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { } int32_t sdbDeleteRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj); @@ -768,15 +769,15 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) { return TSDB_CODE_SUCCESS; } - if (pWrite->reqFp) { - return (*pWrite->reqFp)(pWrite->pMsg); + if (pWrite->fpReq) { + return (*pWrite->fpReq)(pWrite->pMsg); } else { return sdbDeleteRowImp(pWrite); } } int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; @@ -803,7 +804,7 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { } int32_t sdbUpdateRow(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj); @@ -823,15 +824,15 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) { return TSDB_CODE_SUCCESS; } - if (pWrite->reqFp) { - return (*pWrite->reqFp)(pWrite->pMsg); + if (pWrite->fpReq) { + return (*pWrite->fpReq)(pWrite->pMsg); } else { return sdbUpdateRowImp(pWrite); } } int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = (SSdbTable *)pWrite->table; + SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; @@ -852,14 +853,14 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj)); } - sdbIncRef(pNewOper->table, pNewOper->pObj); + sdbIncRef(pNewOper->pTable, pNewOper->pObj); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { - SSdbTable *pTable = (SSdbTable *)handle; +void *sdbFetchRow(void *tparam, void *pNode, void **ppRow) { + SSdbTable *pTable = tparam; *ppRow = NULL; if (pTable == NULL) return NULL; @@ -880,7 +881,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { } *ppRow = *ppMetaRow; - sdbIncRef(handle, *ppMetaRow); + sdbIncRef(pTable, *ppMetaRow); return pIter; } @@ -934,12 +935,12 @@ void sdbCloseTable(void *handle) { void **ppRow = taosHashIterGet(pIter); if (ppRow == NULL) continue; - SSWriteMsg oper = { + SSWriteMsg wmsg = { .pObj = *ppRow, - .table = pTable, + .pTable = pTable, }; - (*pTable->fpDestroy)(&oper); + (*pTable->fpDestroy)(&wmsg); } taosHashDestroyIter(pIter); @@ -950,44 +951,44 @@ void sdbCloseTable(void *handle) { free(pTable); } -int32_t sdbInitWriteWorker() { +int32_t sdbInitWorker() { tsSdbPool.num = 1; - tsSdbPool.worker = (SSWriteWorker *)calloc(sizeof(SSWriteWorker), tsSdbPool.num); + tsSdbPool.worker = calloc(sizeof(SSdbWorker), tsSdbPool.num); if (tsSdbPool.worker == NULL) return -1; for (int32_t i = 0; i < tsSdbPool.num; ++i) { - SSWriteWorker *pWorker = tsSdbPool.worker + i; + SSdbWorker *pWorker = tsSdbPool.worker + i; pWorker->workerId = i; } - sdbAllocWriteQueue(); + sdbAllocQueue(); mInfo("vgId:1, sdb write is opened"); return 0; } -void sdbCleanupWriteWorker() { +void sdbCleanupWorker() { for (int32_t i = 0; i < tsSdbPool.num; ++i) { - SSWriteWorker *pWorker = tsSdbPool.worker + i; + SSdbWorker *pWorker = tsSdbPool.worker + i; if (pWorker->thread) { taosQsetThreadResume(tsSdbWQset); } } for (int32_t i = 0; i < tsSdbPool.num; ++i) { - SSWriteWorker *pWorker = tsSdbPool.worker + i; + SSdbWorker *pWorker = tsSdbPool.worker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } } - sdbFreeWritequeue(); + sdbFreeQueue(); tfree(tsSdbPool.worker); mInfo("vgId:1, sdb write is closed"); } -int32_t sdbAllocWriteQueue() { +int32_t sdbAllocQueue() { tsSdbWQueue = taosOpenQueue(); if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -1006,7 +1007,7 @@ int32_t sdbAllocWriteQueue() { } for (int32_t i = 0; i < tsSdbPool.num; ++i) { - SSWriteWorker *pWorker = tsSdbPool.worker + i; + SSdbWorker *pWorker = tsSdbPool.worker + i; pWorker->workerId = i; pthread_attr_t thAttr; @@ -1029,7 +1030,7 @@ int32_t sdbAllocWriteQueue() { return TSDB_CODE_SUCCESS; } -void sdbFreeWritequeue() { +void sdbFreeQueue() { taosCloseQueue(tsSdbWQueue); taosFreeQall(tsSdbWQall); taosCloseQset(tsSdbWQset); @@ -1038,8 +1039,8 @@ void sdbFreeWritequeue() { tsSdbWQueue = NULL; } -int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { - SWalHead *pHead = data; +int32_t sdbWriteToQueue(void *wparam, void *hparam, int32_t qtype, void *unsed) { + SWalHead *pHead = hparam; int32_t size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = taosAllocateQitem(size); memcpy(pWal, pHead, size); @@ -1048,10 +1049,10 @@ int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { return 0; } -static void *sdbWorkerFp(void *param) { +static void *sdbWorkerFp(void *pWorker) { SWalHead *pHead; SSWriteMsg *pWrite; - int32_t type; + int32_t qtype; int32_t numOfMsgs; void * item; void * unUsed; @@ -1064,22 +1065,22 @@ static void *sdbWorkerFp(void *param) { } for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &type, &item); - if (type == TAOS_QTYPE_RPC) { + taosGetQitem(tsSdbWQall, &qtype, &item); + if (qtype == TAOS_QTYPE_RPC) { pWrite = (SSWriteMsg *)item; pWrite->processedCount = 1; pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; if (pWrite->pMsg != NULL) { sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", - pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, ((SSdbTable *)pWrite->table)->tableName, pWrite->pObj, - sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version); + pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->tableName, pWrite->pObj, + sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version); } } else { pHead = (SWalHead *)item; pWrite = NULL; } - int32_t code = sdbWrite(pWrite, pHead, type, NULL); + int32_t code = sdbWrite(pWrite, pHead, qtype, NULL); if (code > 0) code = 0; if (pWrite) { pWrite->retCode = code; @@ -1093,12 +1094,12 @@ static void *sdbWorkerFp(void *param) { // browse all items, and process them one by one taosResetQitems(tsSdbWQall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &type, &item); + taosGetQitem(tsSdbWQall, &qtype, &item); - if (type == TAOS_QTYPE_RPC) { + if (qtype == TAOS_QTYPE_RPC) { pWrite = (SSWriteMsg *)item; sdbConfirmForward(NULL, pWrite, pWrite->retCode); - } else if (type == TAOS_QTYPE_FWD) { + } else if (qtype == TAOS_QTYPE_FWD) { pHead = (SWalHead *)item; syncConfirmForward(tsSdbObj.sync, pHead->version, pHead->len); taosFreeQitem(item); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index d3ba149f39f184a993a8767d87ede472607253d0..1ba476cf2a9a513e2e680bebb98fad29c1ff01d1 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -99,13 +99,13 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) { tfree(pTable); } -static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pOper) { - mnodeDestroyChildTable(pOper->pObj); +static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pWMsg) { + mnodeDestroyChildTable(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionInsert(SSWriteMsg *pOper) { - SCTableObj *pTable = pOper->pObj; +static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) { + SCTableObj *pTable = pWMsg->pObj; SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -153,8 +153,8 @@ static int32_t mnodeChildTableActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionDelete(SSWriteMsg *pOper) { - SCTableObj *pTable = pOper->pObj; +static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) { + SCTableObj *pTable = pWMsg->pObj; if (pTable->vgId == 0) { return TSDB_CODE_MND_VGROUP_NOT_EXIST; } @@ -189,8 +189,8 @@ static int32_t mnodeChildTableActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pOper) { - SCTableObj *pNew = pOper->pObj; +static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) { + SCTableObj *pNew = pWMsg->pObj; SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); if (pTable != pNew) { void *oldTableId = pTable->info.tableId; @@ -216,50 +216,50 @@ static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionEncode(SSWriteMsg *pOper) { - SCTableObj *pTable = pOper->pObj; - assert(pTable != NULL && pOper->rowData != NULL); +static int32_t mnodeChildTableActionEncode(SSWriteMsg *pWMsg) { + SCTableObj *pTable = pWMsg->pObj; + assert(pTable != NULL && pWMsg->rowData != NULL); int32_t len = strlen(pTable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID; - memcpy(pOper->rowData, pTable->info.tableId, len); - memset(pOper->rowData + len, 0, 1); + memcpy(pWMsg->rowData, pTable->info.tableId, len); + memset(pWMsg->rowData + len, 0, 1); len++; - memcpy(pOper->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); + memcpy(pWMsg->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - memcpy(pOper->rowData + len, pTable->schema, schemaSize); + memcpy(pWMsg->rowData + len, pTable->schema, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { - memcpy(pOper->rowData + len, pTable->sql, pTable->sqlLen); + memcpy(pWMsg->rowData + len, pTable->sql, pTable->sqlLen); len += pTable->sqlLen; } } - pOper->rowSize = len; + pWMsg->rowSize = len; return TSDB_CODE_SUCCESS; } -static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) { - assert(pOper->rowData != NULL); +static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) { + assert(pWMsg->rowData != NULL); SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pOper->rowData); + int32_t len = strlen(pWMsg->rowData); if (len >= TSDB_TABLE_FNAME_LEN) { free(pTable); return TSDB_CODE_MND_INVALID_TABLE_ID; } - pTable->info.tableId = strdup(pOper->rowData); + pTable->info.tableId = strdup(pWMsg->rowData); len++; - memcpy((char*)pTable + sizeof(char *), pOper->rowData + len, tsChildTableUpdateSize); + memcpy((char*)pTable + sizeof(char *), pWMsg->rowData + len, tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { @@ -269,7 +269,7 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_INVALID_TABLE_TYPE; } - memcpy(pTable->schema, pOper->rowData + len, schemaSize); + memcpy(pTable->schema, pWMsg->rowData + len, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { @@ -278,11 +278,11 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_OUT_OF_MEMORY; } - memcpy(pTable->sql, pOper->rowData + len, pTable->sqlLen); + memcpy(pTable->sql, pWMsg->rowData + len, pTable->sqlLen); } } - pOper->pObj = pTable; + pWMsg->pObj = pTable; return TSDB_CODE_SUCCESS; } @@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() { SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId); if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) { mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId); - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); mnodeDecDbRef(pDb); @@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() { if (pVgroup == NULL) { mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() { mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() { if (pSuperTable == NULL) { mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid); pTable->vgId = 0; - SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); mnodeDecTableRef(pTable); continue; @@ -430,13 +430,13 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) { tfree(pStable); } -static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pOper) { - mnodeDestroySuperTable(pOper->pObj); +static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pWMsg) { + mnodeDestroySuperTable(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pOper) { - SSTableObj *pStable = pOper->pObj; +static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) { + SSTableObj *pStable = pWMsg->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) { mnodeAddSuperTableIntoDb(pDb); @@ -446,8 +446,8 @@ static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pOper) { - SSTableObj *pStable = pOper->pObj; +static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) { + SSTableObj *pStable = pWMsg->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL) { mnodeRemoveSuperTableFromDb(pDb); @@ -458,8 +458,8 @@ static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pOper) { - SSTableObj *pNew = pOper->pObj; +static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) { + SSTableObj *pNew = pWMsg->pObj; SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); if (pTable != NULL && pTable != pNew) { void *oldTableId = pTable->info.tableId; @@ -483,43 +483,43 @@ static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pOper) { - SSTableObj *pStable = pOper->pObj; - assert(pOper->pObj != NULL && pOper->rowData != NULL); +static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pWMsg) { + SSTableObj *pStable = pWMsg->pObj; + assert(pWMsg->pObj != NULL && pWMsg->rowData != NULL); int32_t len = strlen(pStable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID; - memcpy(pOper->rowData, pStable->info.tableId, len); - memset(pOper->rowData + len, 0, 1); + memcpy(pWMsg->rowData, pStable->info.tableId, len); + memset(pWMsg->rowData + len, 0, 1); len++; - memcpy(pOper->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); + memcpy(pWMsg->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); - memcpy(pOper->rowData + len, pStable->schema, schemaSize); + memcpy(pWMsg->rowData + len, pStable->schema, schemaSize); len += schemaSize; - pOper->rowSize = len; + pWMsg->rowSize = len; return TSDB_CODE_SUCCESS; } -static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pOper) { - assert(pOper->rowData != NULL); +static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) { + assert(pWMsg->rowData != NULL); SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pOper->rowData); + int32_t len = strlen(pWMsg->rowData); if (len >= TSDB_TABLE_FNAME_LEN){ free(pStable); return TSDB_CODE_MND_INVALID_TABLE_ID; } - pStable->info.tableId = strdup(pOper->rowData); + pStable->info.tableId = strdup(pWMsg->rowData); len++; - memcpy((char*)pStable + sizeof(char *), pOper->rowData + len, tsSuperTableUpdateSize); + memcpy((char*)pStable + sizeof(char *), pWMsg->rowData + len, tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); @@ -529,9 +529,9 @@ static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pOper) { return TSDB_CODE_MND_NOT_SUPER_TABLE; } - memcpy(pStable->schema, pOper->rowData + len, schemaSize); + memcpy(pStable->schema, pWMsg->rowData + len, schemaSize); - pOper->pObj = pStable; + pWMsg->pObj = pStable; return TSDB_CODE_SUCCESS; } @@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { } else { mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb}; sdbDeleteRow(&desc); } @@ -878,16 +878,16 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { pMsg->pTable = (STableObj *)pStable; mnodeIncTableRef(pMsg->pTable); - SSWriteMsg oper = { + SSWriteMsg wmsg = { .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, + .pTable = tsSuperTableSdb, .pObj = pStable, .rowSize = sizeof(SSTableObj) + schemaSize, .pMsg = pMsg, - .writeCb = mnodeCreateSuperTableCb + .fpWrite = mnodeCreateSuperTableCb }; - int32_t code = sdbInsertRow(&oper); + int32_t code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeDestroySuperTable(pStable); pMsg->pTable = NULL; @@ -937,15 +937,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { mnodeDropAllChildTablesInStable(pStable); } - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeDropSuperTableCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeDropSuperTableCb }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); @@ -1010,15 +1010,15 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, schema[0].name); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeAddSuperTableTagCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeAddSuperTableTagCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { @@ -1044,15 +1044,15 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeDropSuperTableTagCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeDropSuperTableTagCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { @@ -1088,15 +1088,15 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, oldTagName, newTagName); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeModifySuperTableTagNameCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeModifySuperTableTagNameCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) { @@ -1162,15 +1162,15 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeAddSuperTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeAddSuperTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1207,15 +1207,15 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeDropSuperTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeDropSuperTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1251,15 +1251,15 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, oldName, newName); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsSuperTableSdb, - .pObj = pStable, - .pMsg = pMsg, - .writeCb = mnodeChangeSuperTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pObj = pStable, + .pMsg = pMsg, + .fpWrite = mnodeChangeSuperTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } // show super tables @@ -1417,12 +1417,12 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsSuperTableSdb, - .pObj = pTable, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsSuperTableSdb, + .pObj = pTable, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfTables ++; } @@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { } else { mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; + SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsChildTableSdb}; sdbDeleteRow(&desc); return code; } @@ -1781,11 +1781,11 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { mnodeIncTableRef(pMsg->pTable); SSWriteMsg desc = { - .type = SDB_OPER_GLOBAL, - .pObj = pTable, - .table = tsChildTableSdb, - .pMsg = pMsg, - .reqFp = mnodeDoCreateChildTableFp + .type = SDB_OPER_GLOBAL, + .pObj = pTable, + .pTable = tsChildTableSdb, + .pMsg = pMsg, + .fpReq = mnodeDoCreateChildTableFp }; int32_t code = sdbInsertRow(&desc); @@ -1901,15 +1901,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_APP_ERROR; } - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable, - .pMsg = pMsg, - .writeCb = mnodeDropChildTableCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pObj = pTable, + .pMsg = pMsg, + .fpWrite = mnodeDropChildTableCb }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); @@ -2005,15 +2005,15 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable, - .pMsg = pMsg, - .writeCb = mnodeAlterNormalTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pObj = pTable, + .pMsg = pMsg, + .fpWrite = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { @@ -2038,15 +2038,15 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable, - .pMsg = pMsg, - .writeCb = mnodeAlterNormalTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pObj = pTable, + .pMsg = pMsg, + .fpWrite = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { @@ -2075,15 +2075,15 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, oldName, newName); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable, - .pMsg = pMsg, - .writeCb = mnodeAlterNormalTableColumnCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pObj = pTable, + .pMsg = pMsg, + .fpWrite = mnodeAlterNormalTableColumnCb }; - return sdbUpdateRow(&oper); + return sdbUpdateRow(&wmsg); } static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) { @@ -2218,12 +2218,12 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { if (pTable == NULL) break; if (pTable->vgId == pVgroup->vgId) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsChildTableSdb, - .pObj = pTable, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsChildTableSdb, + .pObj = pTable, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfTables++; } mnodeDecTableRef(pTable); @@ -2251,12 +2251,12 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsChildTableSdb, - .pObj = pTable, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsChildTableSdb, + .pObj = pTable, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfTables++; } mnodeDecTableRef(pTable); @@ -2280,12 +2280,12 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) { if (pTable == NULL) break; if (pTable->superTable == pStable) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsChildTableSdb, - .pObj = pTable, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsChildTableSdb, + .pObj = pTable, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfTables++; } @@ -2411,11 +2411,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { SSWriteMsg desc = { - .type = SDB_OPER_GLOBAL, - .pObj = pTable, - .table = tsChildTableSdb, - .pMsg = mnodeMsg, - .writeCb = mnodeDoCreateChildTableCb + .type = SDB_OPER_GLOBAL, + .pObj = pTable, + .pTable = tsChildTableSdb, + .pMsg = mnodeMsg, + .fpWrite = mnodeDoCreateChildTableCb }; int32_t code = sdbInsertRowImp(&desc); @@ -2440,8 +2440,8 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry); - SSWriteMsg oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; - sdbDeleteRow(&oper); + SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable}; + sdbDeleteRow(&wmsg); if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) { //Avoid retry again in client diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index edd0839b0c344be8db464fae024711f6751547a1..95d5befa5a92505e9fce43d190777940e0cca91d 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg); -static int32_t mnodeUserActionDestroy(SSWriteMsg *pOper) { - tfree(pOper->pObj); +static int32_t mnodeUserActionDestroy(SSWriteMsg *pWMsg) { + tfree(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionInsert(SSWriteMsg *pOper) { - SUserObj *pUser = pOper->pObj; +static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) { + SUserObj *pUser = pWMsg->pObj; SAcctObj *pAcct = mnodeGetAcct(pUser->acct); if (pAcct != NULL) { @@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionDelete(SSWriteMsg *pOper) { - SUserObj *pUser = pOper->pObj; +static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) { + SUserObj *pUser = pWMsg->pObj; SAcctObj *pAcct = mnodeGetAcct(pUser->acct); if (pAcct != NULL) { @@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionUpdate(SSWriteMsg *pOper) { - SUserObj *pUser = pOper->pObj; +static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) { + SUserObj *pUser = pWMsg->pObj; SUserObj *pSaved = mnodeGetUser(pUser->user); if (pUser != pSaved) { memcpy(pSaved, pUser, tsUserUpdateSize); @@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionEncode(SSWriteMsg *pOper) { - SUserObj *pUser = pOper->pObj; - memcpy(pOper->rowData, pUser, tsUserUpdateSize); - pOper->rowSize = tsUserUpdateSize; +static int32_t mnodeUserActionEncode(SSWriteMsg *pWMsg) { + SUserObj *pUser = pWMsg->pObj; + memcpy(pWMsg->rowData, pUser, tsUserUpdateSize); + pWMsg->rowSize = tsUserUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeUserActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeUserActionDecode(SSWriteMsg *pWMsg) { SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj)); if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pUser, pOper->rowData, tsUserUpdateSize); - pOper->pObj = pUser; + memcpy(pUser, pWMsg->rowData, tsUserUpdateSize); + pWMsg->pObj = pUser; return TSDB_CODE_SUCCESS; } @@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) { } static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsUserSdb, - .pObj = pUser, - .pMsg = pMsg + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsUserSdb, + .pObj = pUser, + .pMsg = pMsg }; - int32_t code = sdbUpdateRow(&oper); + int32_t code = sdbUpdateRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); } else { @@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { pUser->superAuth = 1; } - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsUserSdb, - .pObj = pUser, - .rowSize = sizeof(SUserObj), - .pMsg = pMsg + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsUserSdb, + .pObj = pUser, + .rowSize = sizeof(SUserObj), + .pMsg = pMsg }; - code = sdbInsertRow(&oper); + code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); tfree(pUser); @@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { } static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsUserSdb, - .pObj = pUser, - .pMsg = pMsg + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsUserSdb, + .pObj = pUser, + .pMsg = pMsg }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); } else { @@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { if (pUser == NULL) break; if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsUserSdb, - .pObj = pUser, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsUserSdb, + .pObj = pUser, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfUsers++; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index da0ed1cd36517f9768be4575b778c6f49242c08f..5a99dfbfa396283e00f7b236f0e5968a6a73b340 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) { tfree(pVgroup); } -static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pOper) { - mnodeDestroyVgroup(pOper->pObj); +static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pWMsg) { + mnodeDestroyVgroup(pWMsg->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionInsert(SSWriteMsg *pOper) { - SVgObj *pVgroup = pOper->pObj; +static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) { + SVgObj *pVgroup = pWMsg->pObj; // refer to db SDbObj *pDb = mnodeGetDb(pVgroup->dbName); @@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionDelete(SSWriteMsg *pOper) { - SVgObj *pVgroup = pOper->pObj; +static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) { + SVgObj *pVgroup = pWMsg->pObj; if (pVgroup->pDb == NULL) { mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); @@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pOper) { - SVgObj *pNew = pOper->pObj; +static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) { + SVgObj *pNew = pWMsg->pObj; SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId); if (pVgroup != pNew) { @@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionEncode(SSWriteMsg *pOper) { - SVgObj *pVgroup = pOper->pObj; - memcpy(pOper->rowData, pVgroup, tsVgUpdateSize); - SVgObj *pTmpVgroup = pOper->rowData; +static int32_t mnodeVgroupActionEncode(SSWriteMsg *pWMsg) { + SVgObj *pVgroup = pWMsg->pObj; + memcpy(pWMsg->rowData, pVgroup, tsVgUpdateSize); + SVgObj *pTmpVgroup = pWMsg->rowData; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { pTmpVgroup->vnodeGid[i].pDnode = NULL; pTmpVgroup->vnodeGid[i].role = 0; } - pOper->rowSize = tsVgUpdateSize; + pWMsg->rowSize = tsVgUpdateSize; return TSDB_CODE_SUCCESS; } -static int32_t mnodeVgroupActionDecode(SSWriteMsg *pOper) { +static int32_t mnodeVgroupActionDecode(SSWriteMsg *pWMsg) { SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj)); if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pVgroup, pOper->rowData, tsVgUpdateSize); - pOper->pObj = pVgroup; + memcpy(pVgroup, pWMsg->rowData, tsVgUpdateSize); + pWMsg->pObj = pVgroup; return TSDB_CODE_SUCCESS; } @@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) { } void mnodeUpdateVgroup(SVgObj *pVgroup) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup }; - int32_t code = sdbUpdateRow(&oper); + int32_t code = sdbUpdateRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } @@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, tstrerror(code)); - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; sdbDeleteRow(&desc); return code; } else { mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); pVgroup->status = TAOS_VG_STATUS_READY; - SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; (void)sdbUpdateRow(&desc); dnodeReprocessMWriteMsg(pMsg); @@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { // mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, // pDb->name, pVgroup->numOfVnodes); // pVgroup->status = TAOS_VG_STATUS_READY; - // SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + // SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; // (void)sdbUpdateRow(&desc); // dnodeReprocessMWriteMsg(pMsg); // return TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { pMsg->pVgroup = pVgroup; mnodeIncVgroupRef(pVgroup); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup, - .rowSize = sizeof(SVgObj), - .pMsg = pMsg, - .reqFp = mnodeCreateVgroupFp + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup, + .rowSize = sizeof(SVgObj), + .pMsg = pMsg, + .fpReq = mnodeCreateVgroupFp }; - code = sdbInsertRow(&oper); + code = sdbInsertRow(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { pMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); @@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { } else { mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mnodeSendDropVgroupMsg(pVgroup, NULL); - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); } } @@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received == mnodeMsg->successed) { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup, - .rowSize = sizeof(SVgObj), - .pMsg = mnodeMsg, - .writeCb = mnodeCreateVgroupCb + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup, + .rowSize = sizeof(SVgObj), + .pMsg = mnodeMsg, + .fpWrite = mnodeCreateVgroupCb }; - int32_t code = sdbInsertRowImp(&oper); + int32_t code = sdbInsertRowImp(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); dnodeSendRpcMWriteRsp(mnodeMsg, code); } } else { - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code); } } @@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; - SSWriteMsg oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup + SSWriteMsg wmsg = { + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup }; - int32_t code = sdbDeleteRow(&oper); + int32_t code = sdbDeleteRow(&wmsg); if (code != 0) { code = TSDB_CODE_MND_SDB_ERROR; } @@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) { if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) { mnodeDropAllChildTablesInVgroups(pVgroup); - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsVgroupSdb, - .pObj = pVgroup, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfVgroups++; } mnodeDecVgroupRef(pVgroup); @@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) { if (pVgroup == NULL) break; if (pVgroup->pDb == pDropDb) { - SSWriteMsg oper = { - .type = SDB_OPER_LOCAL, - .table = tsVgroupSdb, - .pObj = pVgroup, + SSWriteMsg wmsg = { + .type = SDB_OPER_LOCAL, + .pTable = tsVgroupSdb, + .pObj = pVgroup, }; - sdbDeleteRow(&oper); + sdbDeleteRow(&wmsg); numOfVgroups++; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index e26778e86b9876ab6e6f47f4b41207ea35cafbf5..591d7749ea485a26c7c9f45e0522e55cdd3fa79b 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -113,22 +113,22 @@ echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG echo "sdbDebugFlag 143" >> $TAOS_CFG -echo "dDebugFlag 143" >> $TAOS_CFG -echo "vDebugFlag 143" >> $TAOS_CFG -echo "tsdbDebugFlag 143" >> $TAOS_CFG -echo "cDebugFlag 143" >> $TAOS_CFG -echo "jnidebugFlag 143" >> $TAOS_CFG -echo "odbcdebugFlag 143" >> $TAOS_CFG -echo "httpDebugFlag 143" >> $TAOS_CFG -echo "monitorDebugFlag 143" >> $TAOS_CFG -echo "mqttDebugFlag 143" >> $TAOS_CFG -echo "qdebugFlag 143" >> $TAOS_CFG -echo "rpcDebugFlag 143" >> $TAOS_CFG +echo "dDebugFlag 131" >> $TAOS_CFG +echo "vDebugFlag 131" >> $TAOS_CFG +echo "tsdbDebugFlag 131" >> $TAOS_CFG +echo "cDebugFlag 131" >> $TAOS_CFG +echo "jnidebugFlag 131" >> $TAOS_CFG +echo "odbcdebugFlag 131" >> $TAOS_CFG +echo "httpDebugFlag 131" >> $TAOS_CFG +echo "monitorDebugFlag 131" >> $TAOS_CFG +echo "mqttDebugFlag 131" >> $TAOS_CFG +echo "qdebugFlag 131" >> $TAOS_CFG +echo "rpcDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG -echo "udebugFlag 143" >> $TAOS_CFG +echo "udebugFlag 131" >> $TAOS_CFG echo "sdebugFlag 143" >> $TAOS_CFG echo "wdebugFlag 143" >> $TAOS_CFG -echo "cqdebugFlag 143" >> $TAOS_CFG +echo "cqdebugFlag 131" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG @@ -140,7 +140,7 @@ echo "clog 2" >> $TAOS_CFG #echo "cache 1" >> $TAOS_CFG echo "days 10" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG -echo "maxVgroupsPerDb 4" >> $TAOS_CFG +echo "maxVgroupsPerDb 10" >> $TAOS_CFG echo "minTablesPerVnode 4" >> $TAOS_CFG echo "maxTablesPerVnode 1000" >> $TAOS_CFG echo "tableIncStepPerVnode 10000" >> $TAOS_CFG