From 0136cae76079862fe08aa313693dd5fb51ebea66 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 27 Apr 2022 10:35:21 +0800 Subject: [PATCH] refactor(cluster): add cb func on transaction start and stop --- include/dnode/mnode/sdb/sdb.h | 29 ++++- source/dnode/mnode/impl/inc/mndDef.h | 8 +- source/dnode/mnode/impl/inc/mndTrans.h | 11 +- source/dnode/mnode/impl/src/mndTrans.c | 79 +++++++++++--- source/dnode/mnode/impl/src/mndUser.c | 145 ++++++++++++------------- source/dnode/mnode/sdb/inc/sdbInt.h | 24 ---- source/dnode/mnode/sdb/src/sdb.c | 2 +- source/dnode/mnode/sdb/src/sdbHash.c | 16 +-- source/dnode/mnode/sdb/src/sdbRaw.c | 4 +- source/dnode/mnode/sdb/src/sdbRow.c | 4 +- 10 files changed, 192 insertions(+), 130 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index d534b0405f..1613339d10 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -18,6 +18,11 @@ #include "os.h" +#include "thash.h" +#include "tlockfree.h" +#include "tlog.h" +#include "tmsg.h" + #ifdef __cplusplus extern "C" { #endif @@ -135,7 +140,7 @@ typedef enum { typedef struct SSdb SSdb; typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); -typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); +typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); @@ -326,9 +331,29 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); SSdbRow *sdbAllocRow(int32_t objSize); -void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow); +void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); void *sdbGetRowObj(SSdbRow *pRow); +typedef struct SSdb { + SMnode *pMnode; + char *currDir; + char *syncDir; + char *tmpDir; + int64_t lastCommitVer; + int64_t curVer; + int64_t tableVer[SDB_MAX]; + int64_t maxId[SDB_MAX]; + EKeyType keyTypes[SDB_MAX]; + SHashObj *hashObjs[SDB_MAX]; + SRWLatch locks[SDB_MAX]; + SdbInsertFp insertFps[SDB_MAX]; + SdbUpdateFp updateFps[SDB_MAX]; + SdbDeleteFp deleteFps[SDB_MAX]; + SdbDeployFp deployFps[SDB_MAX]; + SdbEncodeFp encodeFps[SDB_MAX]; + SdbDecodeFp decodeFps[SDB_MAX]; +} SSdb; + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 36a8173adf..b1a33e54aa 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -126,8 +126,6 @@ typedef enum { DND_REASON_OTHERS } EDndReason; -typedef void (*TransCbFp)(SMnode* pMnode, void* param); - typedef struct { int32_t id; ETrnStage stage; @@ -150,8 +148,10 @@ typedef struct { int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; - TransCbFp transCbFp; - void* transCbParam; + int32_t startFunc; + int32_t stopFunc; + int32_t paramLen; + void* param; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 2fcc82e861..5b5aff7c86 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -33,6 +33,15 @@ typedef struct { void *pCont; } STransAction; +typedef enum { + TEST_TRANS_START_FUNC = 1, + TEST_TRANS_STOP_FUNC = 2, + CONSUME_TRANS_START_FUNC = 3, + CONSUME_TRANS_STOP_FUNC = 4, +} ETrnFuncType; + +typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); + int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); @@ -44,7 +53,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); -void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param); +void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 582b60f7b2..c08b0f6db9 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -29,7 +29,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); -static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); +static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -174,6 +174,13 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) } + SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, TRANS_ENCODE_OVER) + if (pTrans->param != NULL) { + SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_ENCODE_OVER) + } + SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER) @@ -305,6 +312,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.pCont = NULL; } + SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->paramLen, TRANS_DECODE_OVER) + if (pTrans->paramLen != 0) { + pTrans->param = taosMemoryMalloc(pTrans->paramLen); + SDB_GET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_DECODE_OVER); + } + SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER) terrno = 0; @@ -413,9 +428,36 @@ static const char *mndTransType(ETrnType type) { } } +static void mndTransTestStartFunc(SMnode *pMnode, void *param, int32_t paramLen) { + mInfo("test trans start, param:%s, len:%d", (char *)param, paramLen); +} + +static void mndTransTestStopFunc(SMnode *pMnode, void *param, int32_t paramLen) { + mInfo("test trans stop, param:%s, len:%d", (char *)param, paramLen); +} + +static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) { + switch (ftype) { + case TEST_TRANS_START_FUNC: + return mndTransTestStartFunc; + case TEST_TRANS_STOP_FUNC: + return mndTransTestStopFunc; + default: + return NULL; + } +} + static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { // pTrans->stage = TRN_STAGE_PREPARE; mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); + + if (pTrans->startFunc > 0) { + TransCbFp fp = mndTransGetCbFp(pTrans->startFunc); + if (fp) { + (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen); + } + } + return 0; } @@ -430,10 +472,23 @@ static void mndTransDropData(STrans *pTrans) { pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; } + if (pTrans->param != NULL) { + taosMemoryFree(pTrans->param); + pTrans->param = NULL; + pTrans->paramLen = 0; + } } -static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); +static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { + mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), + callFunc); + if (pTrans->stopFunc > 0 && callFunc) { + TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc); + if (fp) { + (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen); + } + } + mndTransDropData(pTrans); return 0; } @@ -498,7 +553,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return NULL; } - mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans); + mDebug("trans:%d, local var is created, data:%p", pTrans->id, pTrans); return pTrans; } @@ -525,7 +580,7 @@ static void mndTransDropActions(SArray *pArray) { void mndTransDrop(STrans *pTrans) { if (pTrans != NULL) { mndTransDropData(pTrans); - mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); + mDebug("trans:%d, local var is freed, data:%p", pTrans->id, pTrans); taosMemoryFreeClear(pTrans); } } @@ -574,9 +629,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { pTrans->rpcRspLen = contLen; } -void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param) { - pTrans->transCbFp = fp; - pTrans->transCbParam = param; +void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen) { + pTrans->startFunc = startFunc; + pTrans->stopFunc = stopFunc; + pTrans->param = param; + pTrans->paramLen = paramLen; } void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { @@ -712,8 +769,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pNew->rpcRefId = pTrans->rpcRefId; pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRspLen = pTrans->rpcRspLen; - pNew->transCbFp = pTrans->transCbFp; - pNew->transCbParam = pTrans->transCbParam; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -1125,10 +1180,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); - if (pTrans->transCbFp != NULL) { - (*pTrans->transCbFp)(pMnode, pTrans->transCbParam); - } - return continueExec; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d6876782f6..1500b3d7d8 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -96,36 +96,36 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN; SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size); - if (pRaw == NULL) goto USER_ENCODE_OVER; + if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, USER_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, USER_ENCODE_OVER) - SDB_SET_INT8(pRaw, dataPos, pUser->superUser, USER_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, USER_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, USER_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, _OVER) + SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, _OVER) + SDB_SET_INT8(pRaw, dataPos, pUser->superUser, _OVER) + SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) + SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER) char *db = taosHashIterate(pUser->readDbs, NULL); while (db != NULL) { - SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER); db = taosHashIterate(pUser->readDbs, db); } db = taosHashIterate(pUser->writeDbs, NULL); while (db != NULL) { - SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER); db = taosHashIterate(pUser->writeDbs, db); } - SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_ENCODE_OVER) - SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) + SDB_SET_DATALEN(pRaw, dataPos, _OVER) terrno = 0; -USER_ENCODE_OVER: +_OVER: if (terrno != 0) { mError("user:%s, failed to encode to raw:%p since %s", pUser->user, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -140,55 +140,54 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sver != USER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto USER_DECODE_OVER; + goto _OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); - if (pRow == NULL) goto USER_DECODE_OVER; + if (pRow == NULL) goto _OVER; SUserObj *pUser = sdbGetRowObj(pRow); - if (pUser == NULL) goto USER_DECODE_OVER; - - pUser->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); - pUser->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); - if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto USER_DECODE_OVER; + if (pUser == NULL) goto _OVER; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_DECODE_OVER) - SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_DECODE_OVER) - SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, USER_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, USER_DECODE_OVER) - SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, USER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, _OVER) int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; - SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, USER_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, USER_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER) + SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER) + pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); + pUser->writeDbs = taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); + if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto _OVER; for (int32_t i = 0; i < numOfReadDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER) int32_t len = strlen(db) + 1; taosHashPut(pUser->readDbs, db, len, db, TSDB_DB_FNAME_LEN); } for (int32_t i = 0; i < numOfWriteDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER) int32_t len = strlen(db) + 1; taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN); } - SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) terrno = 0; -USER_DECODE_OVER: +_OVER: if (terrno != 0) { mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr()); taosHashCleanup(pUser->readDbs); @@ -220,6 +219,8 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->writeDbs); + pUser->readDbs = NULL; + pUser->writeDbs = NULL; return 0; } @@ -228,13 +229,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); pOld->updateTime = pNew->updateTime; - void *tmp1 = pOld->readDbs; - pOld->readDbs = pNew->readDbs; - pNew->readDbs = tmp1; - - void *tmp2 = pOld->writeDbs; - pOld->writeDbs = pNew->writeDbs; - pNew->writeDbs = tmp2; + TSWAP(pOld->readDbs, pNew->readDbs, (void *)); + TSWAP(pOld->writeDbs, pNew->writeDbs, (void *)); return 0; } @@ -277,6 +273,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate } sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + char *param = strdup("====> test code to be deleted later <====="); + mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); @@ -296,41 +295,41 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq) { if (tDeserializeSCreateUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto CREATE_USER_OVER; + goto _OVER; } mDebug("user:%s, start to create", createReq.user); if (createReq.user[0] == 0) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; - goto CREATE_USER_OVER; + goto _OVER; } if (createReq.pass[0] == 0) { terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; - goto CREATE_USER_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, createReq.user); if (pUser != NULL) { terrno = TSDB_CODE_MND_USER_ALREADY_EXIST; - goto CREATE_USER_OVER; + goto _OVER; } pOperUser = mndAcquireUser(pMnode, pReq->user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto CREATE_USER_OVER; + goto _OVER; } if (mndCheckCreateUserAuth(pOperUser) != 0) { - goto CREATE_USER_OVER; + goto _OVER; } code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -CREATE_USER_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to create since %s", createReq.user, terrstr()); } @@ -399,38 +398,38 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { if (tDeserializeSAlterUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto ALTER_USER_OVER; + goto _OVER; } mDebug("user:%s, start to alter", alterReq.user); if (alterReq.user[0] == 0) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; - goto ALTER_USER_OVER; + goto _OVER; } if (alterReq.pass[0] == 0) { terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; - goto ALTER_USER_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, alterReq.user); if (pUser == NULL) { terrno = TSDB_CODE_MND_USER_NOT_EXIST; - goto ALTER_USER_OVER; + goto _OVER; } pOperUser = mndAcquireUser(pMnode, pReq->user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto ALTER_USER_OVER; + goto _OVER; } memcpy(&newUser, pUser, sizeof(SUserObj)); newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs); if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { - goto ALTER_USER_OVER; + goto _OVER; } int32_t len = strlen(alterReq.dbname) + 1; @@ -446,50 +445,50 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) { if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto ALTER_USER_OVER; + goto _OVER; } if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto ALTER_USER_OVER; + goto _OVER; } } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto ALTER_USER_OVER; + goto _OVER; } } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { taosHashClear(newUser.readDbs); } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto ALTER_USER_OVER; + goto _OVER; } if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto ALTER_USER_OVER; + goto _OVER; } } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto ALTER_USER_OVER; + goto _OVER; } } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { taosHashClear(newUser.writeDbs); } else { terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; - goto ALTER_USER_OVER; + goto _OVER; } newUser.updateTime = taosGetTimestampMs(); if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) { - goto ALTER_USER_OVER; + goto _OVER; } code = mndUpdateUser(pMnode, pUser, &newUser, pReq); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -ALTER_USER_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to alter since %s", alterReq.user, terrstr()); } @@ -537,36 +536,36 @@ static int32_t mndProcessDropUserReq(SNodeMsg *pReq) { if (tDeserializeSDropUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto DROP_USER_OVER; + goto _OVER; } mDebug("user:%s, start to drop", dropReq.user); if (dropReq.user[0] == 0) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; - goto DROP_USER_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, dropReq.user); if (pUser == NULL) { terrno = TSDB_CODE_MND_USER_NOT_EXIST; - goto DROP_USER_OVER; + goto _OVER; } pOperUser = mndAcquireUser(pMnode, pReq->user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto DROP_USER_OVER; + goto _OVER; } if (mndCheckDropUserAuth(pOperUser) != 0) { - goto DROP_USER_OVER; + goto _OVER; } code = mndDropUser(pMnode, pReq, pUser); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -DROP_USER_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("user:%s, failed to drop since %s", dropReq.user, terrstr()); } @@ -586,7 +585,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { if (tDeserializeSGetUserAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto GET_AUTH_OVER; + goto _OVER; } mTrace("user:%s, start to get auth", authReq.user); @@ -594,7 +593,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { pUser = mndAcquireUser(pMnode, authReq.user); if (pUser == NULL) { terrno = TSDB_CODE_MND_USER_NOT_EXIST; - goto GET_AUTH_OVER; + goto _OVER; } memcpy(authRsp.user, pUser->user, TSDB_USER_LEN); @@ -622,7 +621,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { void *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto GET_AUTH_OVER; + goto _OVER; } tSerializeSGetUserAuthRsp(pRsp, contLen, &authRsp); @@ -631,7 +630,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { pReq->rspLen = contLen; code = 0; -GET_AUTH_OVER: +_OVER: mndReleaseUser(pMnode, pUser); taosHashCleanup(authRsp.readDbs); taosHashCleanup(authRsp.writeDbs); diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index d268b93051..de71ac47b6 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -19,10 +19,6 @@ #include "os.h" #include "sdb.h" -#include "thash.h" -#include "tlockfree.h" -#include "tlog.h" -#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -53,26 +49,6 @@ typedef struct SSdbRow { char pObj[]; } SSdbRow; -typedef struct SSdb { - SMnode *pMnode; - char *currDir; - char *syncDir; - char *tmpDir; - int64_t lastCommitVer; - int64_t curVer; - int64_t tableVer[SDB_MAX]; - int64_t maxId[SDB_MAX]; - EKeyType keyTypes[SDB_MAX]; - SHashObj *hashObjs[SDB_MAX]; - SRWLatch locks[SDB_MAX]; - SdbInsertFp insertFps[SDB_MAX]; - SdbUpdateFp updateFps[SDB_MAX]; - SdbDeleteFp deleteFps[SDB_MAX]; - SdbDeployFp deployFps[SDB_MAX]; - SdbEncodeFp encodeFps[SDB_MAX]; - SdbDecodeFp decodeFps[SDB_MAX]; -} SSdb; - const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index bc878d7b31..f85ff7a26e 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -87,7 +87,7 @@ void sdbCleanup(SSdb *pSdb) { SSdbRow *pRow = *ppRow; if (pRow == NULL) continue; - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, true); ppRow = taosHashIterate(hash, ppRow); } } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 1158bedc21..b34c2a82d8 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -137,7 +137,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); if (pOldRow != NULL) { taosWUnLockLatch(pLock); - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; return terrno; } @@ -148,7 +148,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { taosWUnLockLatch(pLock); - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; return terrno; } @@ -164,7 +164,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosWLockLatch(pLock); taosHashRemove(hash, pRow->pObj, keySize); taosWUnLockLatch(pLock); - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, false); terrno = code; return terrno; } @@ -202,7 +202,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } - sdbFreeRow(pSdb, pNewRow); + sdbFreeRow(pSdb, pNewRow, false); pSdb->tableVer[pOldRow->type]++; return code; @@ -215,7 +215,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { taosWUnLockLatch(pLock); - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return terrno; } @@ -228,7 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosWUnLockLatch(pLock); pSdb->tableVer[pOldRow->type]++; - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, false); sdbCheck(pSdb, pOldRow); // sdbRelease(pSdb, pOldRow->pObj); @@ -322,7 +322,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { int32_t ref = atomic_load_32(&pRow->refCount); sdbPrintOper(pSdb, pRow, "check"); if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, true); } taosRUnLockLatch(pLock); @@ -340,7 +340,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); sdbPrintOper(pSdb, pRow, "release"); if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { - sdbFreeRow(pSdb, pRow); + sdbFreeRow(pSdb, pRow, true); } taosRUnLockLatch(pLock); diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 326fe53fc7..45464c3bd3 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -107,7 +107,9 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_ return -1; } - memcpy(pRaw->pData + dataPos, pVal, valLen); + if (pVal != NULL) { + memcpy(pRaw->pData + dataPos, pVal, valLen); + } return 0; } diff --git a/source/dnode/mnode/sdb/src/sdbRow.c b/source/dnode/mnode/sdb/src/sdbRow.c index 94f87cb350..43f70cb245 100644 --- a/source/dnode/mnode/sdb/src/sdbRow.c +++ b/source/dnode/mnode/sdb/src/sdbRow.c @@ -36,11 +36,11 @@ void *sdbGetRowObj(SSdbRow *pRow) { return pRow->pObj; } -void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow) { +void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc) { // remove attached object such as trans SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type]; if (deleteFp != NULL) { - (*deleteFp)(pSdb, pRow->pObj); + (*deleteFp)(pSdb, pRow->pObj, callFunc); } sdbPrintOper(pSdb, pRow, "free"); -- GitLab