From fde22bca3677758354609893a1f354fe22d1e816 Mon Sep 17 00:00:00 2001 From: Shengliang Date: Sun, 23 Jan 2022 21:59:09 -0800 Subject: [PATCH] manage func obj --- include/common/tmsg.h | 5 +- source/dnode/mnode/impl/inc/mndDef.h | 1 - source/dnode/mnode/impl/src/mndFunc.c | 195 +++++++++++++------------ source/dnode/mnode/impl/src/mndStb.c | 11 +- source/dnode/mnode/impl/src/mndTrans.c | 8 +- 5 files changed, 115 insertions(+), 105 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 73fe2b857c..d528f8dfb2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -526,9 +526,9 @@ typedef struct { typedef struct { char name[TSDB_FUNC_NAME_LEN]; + int8_t igExists; int8_t funcType; int8_t scriptType; - int8_t align; int8_t outputType; int32_t outputLen; int32_t bufSize; @@ -540,6 +540,7 @@ typedef struct { typedef struct { char name[TSDB_FUNC_NAME_LEN]; + int8_t igNotExists; } SDropFuncReq; typedef struct { @@ -549,9 +550,9 @@ typedef struct { typedef struct { char name[TSDB_FUNC_NAME_LEN]; + int8_t align; int8_t funcType; int8_t scriptType; - int8_t align; int8_t outputType; int32_t outputLen; int32_t bufSize; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ebbc42f277..d6b2e13fe0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -289,7 +289,6 @@ typedef struct { int32_t codeSize; char *pComment; char *pCode; - char pData[]; } SFuncObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 402c48403b..15d5ef2d7a 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -25,7 +25,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw); static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc); -static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc); +static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew); static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate); static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc); static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq); @@ -104,13 +104,11 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { goto FUNC_DECODE_OVER; } - int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN; - SSdbRow *pRow = sdbAllocRow(size); + SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj)); if (pRow == NULL) goto FUNC_DECODE_OVER; SFuncObj *pFunc = sdbGetRowObj(pRow); if (pFunc == NULL) goto FUNC_DECODE_OVER; - char *tmp = (char *)pFunc + sizeof(SFuncObj); int32_t dataPos = 0; SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER) @@ -124,9 +122,15 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER) - SDB_GET_BINARY(pRaw, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize, FUNC_DECODE_OVER) - pFunc->pComment = pFunc->pData; - pFunc->pCode = (pFunc->pData + pFunc->commentSize); + + pFunc->pComment = calloc(1, pFunc->commentSize); + pFunc->pCode = calloc(1, pFunc->codeSize); + if (pFunc->pComment == NULL || pFunc->pCode == NULL) { + goto FUNC_DECODE_OVER; + } + + SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER) terrno = 0; @@ -151,114 +155,104 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { return 0; } -static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { - mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc); +static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { + mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); return 0; } -static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) { - SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize); - pFunc->createdTime = taosGetTimestampMs(); - pFunc->funcType = pCreate->funcType; - pFunc->scriptType = pCreate->scriptType; - pFunc->outputType = pCreate->outputType; - pFunc->outputLen = pCreate->outputLen; - pFunc->bufSize = pCreate->bufSize; - pFunc->signature = pCreate->signature; - pFunc->commentSize = pCreate->commentSize; - pFunc->codeSize = pCreate->codeSize; - pFunc->pComment = pFunc->pData; - memcpy(pFunc->pComment, pCreate->pCont, pCreate->commentSize); - pFunc->pCode = pFunc->pData + pCreate->commentSize; - memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize); +static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) { + SSdb *pSdb = pMnode->pSdb; + SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName); + if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_FUNC_NOT_EXIST; + } + return pFunc; +} - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); - if (pTrans == NULL) { - free(pFunc); - mError("func:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; +static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pFunc); +} + +static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) { + int32_t code = -1; + STrans *pTrans = NULL; + + SFuncObj func = {0}; + memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN); + func.createdTime = taosGetTimestampMs(); + func.funcType = pCreate->funcType; + func.scriptType = pCreate->scriptType; + func.outputType = pCreate->outputType; + func.outputLen = pCreate->outputLen; + func.bufSize = pCreate->bufSize; + func.signature = pCreate->signature; + func.commentSize = pCreate->commentSize; + func.codeSize = pCreate->codeSize; + func.pComment = malloc(func.commentSize); + func.pCode = malloc(func.codeSize); + if (func.pCode == NULL || func.pCode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto CREATE_FUNC_OVER; } + memcpy(func.pComment, pCreate->pCont, pCreate->commentSize); + memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize); + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL) goto CREATE_FUNC_OVER; + mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name); - SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - free(pFunc); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); + SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER; - SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - free(pFunc); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); + SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER; - SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - free(pFunc); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER; - free(pFunc); + code = 0; + +CREATE_FUNC_OVER: + free(func.pCode); + free(func.pComment); mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); - if (pTrans == NULL) { - mError("func:%s, failed to drop since %s", pFunc->name, terrstr()); - return -1; - } + if (pTrans == NULL) goto DROP_FUNC_OVER; + mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name); SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER; sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER; sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER; sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER; + + code = 0; +DROP_FUNC_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) { @@ -273,11 +267,19 @@ static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) { mDebug("func:%s, start to create", pCreate->name); - SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pCreate->name); + SFuncObj *pFunc = mndAcquireFunc(pMnode->pSdb, pCreate->name); if (pFunc != NULL) { - sdbRelease(pMnode->pSdb, pFunc); - terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; - mError("func:%s, failed to create since %s", pCreate->name, terrstr()); + mndReleaseFunc(pMnode->pSdb, pFunc); + if (pCreate->igExists) { + mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); + return 0; + } else { + terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; + mError("func:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + } else if (terrno != TSDB_CODE_MND_FUNC_ALREADY_EXIST) { + mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } @@ -312,7 +314,6 @@ static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) { } int32_t code = mndCreateFunc(pMnode, pReq, pCreate); - if (code != 0) { mError("func:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -333,15 +334,19 @@ static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) { return -1; } - SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pDrop->name); + SFuncObj *pFunc = mndAcquireFunc(pMnode->pSdb, pDrop->name); if (pFunc == NULL) { - terrno = TSDB_CODE_MND_FUNC_NOT_EXIST; - mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); - return -1; + if (pDrop->igNotExists) { + mDebug("func:%s, not exist, ignore not exist is set", pDrop->name); + return 0; + } else { + terrno = TSDB_CODE_MND_FUNC_NOT_EXIST; + mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); + return -1; + } } int32_t code = mndDropFunc(pMnode, pReq, pFunc); - if (code != 0) { mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 5ed801e3ea..cd1b69d023 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -123,8 +123,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { goto STB_DECODE_OVER; } - int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); + SSdbRow *pRow = sdbAllocRow(sizeof(SStbObj)); if (pRow == NULL) goto STB_DECODE_OVER; SStbObj *pStb = sdbGetRowObj(pRow); @@ -143,6 +142,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; pStb->pSchema = calloc(totalCols, sizeof(SSchema)); + if (pStb->pSchema == NULL) { + goto STB_DECODE_OVER; + } for (int32_t i = 0; i < totalCols; ++i) { SSchema *pSchema = &pStb->pSchema[i]; @@ -448,7 +450,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr stbObj.pSchema[i].colId = i + 1; } - int32_t code = 0; + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_STB_OVER; @@ -481,7 +483,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); if (pStb != NULL) { - sdbRelease(pMnode->pSdb, pStb); + mndReleaseStb(pMnode->pSdb, pStb); if (pCreate->igExists) { mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); return 0; @@ -492,6 +494,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; } // topic should have different name with stb diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 2301df65d7..03226c7400 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -390,9 +390,11 @@ static void mndTransDropActions(SArray *pArray) { } void mndTransDrop(STrans *pTrans) { - mndTransDropData(pTrans); - mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); - tfree(pTrans); + if (pTrans != NULL) { + mndTransDropData(pTrans); + mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); + tfree(pTrans); + } } static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { -- GitLab