diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1ff1165e21a1ab607f6a97cbc8ac05af56625bca..017d77d1c0129c49a0497e68b7c0ec978439318c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -448,6 +448,7 @@ typedef struct { char* pComment; char* pCode; int32_t funcVersion; + SRWLatch lock; } SFuncObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index f0104194d8db3db54bb9141c170bb3fb82123a85..5042e6543fe26e89f90abff3c8d7baeb5fc3a604 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -152,6 +152,8 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) + taosInitRWLatch(&pFunc->lock); + terrno = 0; _OVER: @@ -179,6 +181,44 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + + taosWLockLatch(&pOld->lock); + + pOld->align = pNew->align; + pOld->bufSize = pNew->bufSize; + pOld->codeSize = pNew->codeSize; + pOld->commentSize = pNew->commentSize; + pOld->createdTime = pNew->createdTime; + pOld->funcType = pNew->funcType; + pOld->funcVersion = pNew->funcVersion; + pOld->outputLen = pNew->outputLen; + pOld->outputType = pNew->outputType; + + if(pOld->pComment != NULL){ + taosMemoryFree(pOld->pComment); + pOld->pComment = NULL; + } + if(pNew->commentSize > 0 && pNew->pComment != NULL){ + pOld->commentSize = pNew->commentSize; + pOld->pComment = taosMemoryMalloc(pOld->commentSize); + memcpy(pOld->pComment, pNew->pComment, pOld->commentSize); + } + + if(pOld->pCode != NULL){ + taosMemoryFree(pOld->pCode); + pOld->pCode = NULL; + } + if(pNew->codeSize > 0 && pNew->pCode != NULL){ + pOld->codeSize = pNew->codeSize; + pOld->pCode = taosMemoryMalloc(pOld->codeSize); + memcpy(pOld->pCode, pNew->pCode, pOld->codeSize); + } + + pOld->scriptType = pNew->scriptType; + pOld->signature = pNew->signature; + + taosWUnLockLatch(&pOld->lock); + return 0; } @@ -229,36 +269,49 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre } memcpy(func.pCode, pCreate->pCode, func.codeSize); - if(pCreate->orReplace == 1){ - SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); - if(oldFunc != NULL){ - func.funcVersion = oldFunc->funcVersion + 1; - mndReleaseFunc(pMnode, oldFunc); - } - } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); if (pTrans == NULL) goto _OVER; - mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name); - SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; + SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); + if(pCreate->orReplace == 1 && oldFunc != NULL){ + func.funcVersion = oldFunc->funcVersion + 1; + + SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) goto _OVER; - SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER; + SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) goto _OVER; - SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + } + else{ + SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; + + SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER; + + SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + } if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; _OVER: + if(oldFunc != NULL){ + mndReleaseFunc(pMnode, oldFunc); + } + taosMemoryFree(func.pCode); taosMemoryFree(func.pComment); mndTransDrop(pTrans);