From 07ca8eb8e063e982d99bfb5a1319b2a6e999edc1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Mar 2023 10:25:46 +0000 Subject: [PATCH] enh: add trace to transaction --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 2 ++ source/dnode/mnode/impl/src/mndDb.c | 6 +++--- source/dnode/mnode/impl/src/mndDnode.c | 5 +++-- source/dnode/mnode/impl/src/mndSma.c | 3 +++ source/dnode/mnode/impl/src/mndStb.c | 3 ++- source/dnode/mnode/impl/src/mndStream.c | 1 + source/dnode/mnode/impl/src/mndTrans.c | 18 +++++++++++------- 8 files changed, 26 insertions(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ebe96fd740..1feb292779 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -176,6 +176,7 @@ typedef struct { char opername[TSDB_TRANS_OPER_LEN]; SArray* pRpcArray; SRWLatch lockRpcArray; + int64_t mTraceId; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index d6e5d322ba..057e3efbbc 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -52,6 +52,8 @@ typedef struct { int32_t contLen; void *pCont; SSdbRaw *pRaw; + + int64_t mTraceId; } STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e848a81d40..8b09c823ab 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1417,7 +1417,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { +static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; @@ -1439,7 +1439,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { pHead->vgId = htonl(pVgroup->vgId); tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq); - SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen}; + SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info}; SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t code = tmsgSendReq(&epSet, &rpcMsg); if (code != 0) { @@ -1475,7 +1475,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndTrimDb(pMnode, pDb); + code = mndTrimDb(pMnode, pDb, pReq); _OVER: if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1d9db37a7d..d4a84759c1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -440,7 +440,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (roleChanged) { SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); if (pDb != NULL && pDb->stateTs != curMs) { - mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs); + mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, + pDb->stateTs, curMs); pDb->stateTs = curMs; } mndReleaseDb(pMnode, pDb); @@ -954,7 +955,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq); mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle, dcfgReq.config, dcfgReq.value); - SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; + SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; tmsgSendReq(&epSet, &rpcMsg); code = 0; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 09aef2cd96..3d654a23d8 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -470,6 +470,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, return -1; } + action.mTraceId = pTrans->mTraceId; action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_DND_CREATE_VNODE; @@ -693,6 +694,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { SDbObj *pDb = NULL; SMCreateSmaReq createReq = {0}; + int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); + if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2a369a863a..ac8a35c237 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -668,6 +668,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj } STransAction action = {0}; + action.mTraceId = pTrans->mTraceId; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pReq; action.contLen = contLen; @@ -877,7 +878,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { pHead->vgId = htonl(pVgroup->vgId); tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq); - SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; + SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info}; SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t code = tmsgSendReq(&epSet, &rpcMsg); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b7f80f6b0e..ff759f5e78 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -434,6 +434,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { tEncoderClear(&encoder); STransAction action = {0}; + action.mTraceId = pTrans->mTraceId; memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = buf; action.contLen = tlen; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 39b4252618..4aaed29fc3 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -505,8 +505,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) { } static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { - mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), - pTrans->startFunc); + mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, + mndTransStr(pTrans->stage), pTrans->startFunc); if (pTrans->startFunc > 0) { TransCbFp fp = mndTransGetCbFp(pTrans->startFunc); @@ -577,8 +577,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64, - pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), - pNew->createdTime); + pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), pNew->createdTime); if (pOld->createdTime != pNew->createdTime) { mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64 @@ -650,6 +649,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); + pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; taosInitRWLatch(&pTrans->lockRpcArray); if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || @@ -706,7 +706,8 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw}; + STransAction action = { + .stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw, .mTraceId = pTrans->mTraceId}; return mndTransAppendAction(pTrans->redoActions, &action); } @@ -728,6 +729,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { pAction->stage = TRN_STAGE_REDO_ACTION; pAction->actionType = TRANS_ACTION_MSG; + pAction->mTraceId = pTrans->mTraceId; return mndTransAppendAction(pTrans->redoActions, pAction); } @@ -875,7 +877,6 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) { } } - if (mndCheckTransConflict(pMnode, pTrans)) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -912,6 +913,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pNew->pRpcArray = pTrans->pRpcArray; pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRspLen = pTrans->rpcRspLen; + pNew->mTraceId = pTrans->mTraceId; pTrans->pRpcArray = NULL; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -1167,6 +1169,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + rpcMsg.info.traceId.rootId = pTrans->mTraceId; + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); char detail[1024] = {0}; @@ -1457,7 +1461,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->code = 0; - pTrans->stage = TRN_STAGE_FINISHED; // TRN_STAGE_PRE_FINISH is not necessary + pTrans->stage = TRN_STAGE_FINISHED; // TRN_STAGE_PRE_FINISH is not necessary mInfo("trans:%d, stage from commitAction to finished", pTrans->id); continueExec = true; } else { -- GitLab