提交 07ca8eb8 编写于 作者: dengyihao's avatar dengyihao

enh: add trace to transaction

上级 3ac11ce8
...@@ -176,6 +176,7 @@ typedef struct { ...@@ -176,6 +176,7 @@ typedef struct {
char opername[TSDB_TRANS_OPER_LEN]; char opername[TSDB_TRANS_OPER_LEN];
SArray* pRpcArray; SArray* pRpcArray;
SRWLatch lockRpcArray; SRWLatch lockRpcArray;
int64_t mTraceId;
} STrans; } STrans;
typedef struct { typedef struct {
......
...@@ -52,6 +52,8 @@ typedef struct { ...@@ -52,6 +52,8 @@ typedef struct {
int32_t contLen; int32_t contLen;
void *pCont; void *pCont;
SSdbRaw *pRaw; SSdbRaw *pRaw;
int64_t mTraceId;
} STransAction; } STransAction;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
......
...@@ -1417,7 +1417,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1417,7 +1417,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return 0; return 0;
} }
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void *pIter = NULL;
...@@ -1439,7 +1439,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { ...@@ -1439,7 +1439,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq); tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg); int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) { if (code != 0) {
...@@ -1475,7 +1475,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { ...@@ -1475,7 +1475,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
code = mndTrimDb(pMnode, pDb); code = mndTrimDb(pMnode, pDb, pReq);
_OVER: _OVER:
if (code != 0) { if (code != 0) {
......
...@@ -440,7 +440,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -440,7 +440,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (roleChanged) { if (roleChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) { if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs); mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
pDb->stateTs, curMs);
pDb->stateTs = curMs; pDb->stateTs = curMs;
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
...@@ -954,7 +955,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -954,7 +955,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq); tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle, mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
dcfgReq.config, dcfgReq.value); dcfgReq.config, dcfgReq.value);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
tmsgSendReq(&epSet, &rpcMsg); tmsgSendReq(&epSet, &rpcMsg);
code = 0; code = 0;
} }
......
...@@ -470,6 +470,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, ...@@ -470,6 +470,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
return -1; return -1;
} }
action.mTraceId = pTrans->mTraceId;
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE; action.msgType = TDMT_DND_CREATE_VNODE;
...@@ -693,6 +694,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { ...@@ -693,6 +694,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SMCreateSmaReq createReq = {0}; SMCreateSmaReq createReq = {0};
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
......
...@@ -668,6 +668,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -668,6 +668,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
} }
STransAction action = {0}; STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
...@@ -877,7 +878,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { ...@@ -877,7 +878,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq); tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg); int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) { if (code != 0) {
......
...@@ -434,6 +434,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { ...@@ -434,6 +434,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
STransAction action = {0}; STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = buf; action.pCont = buf;
action.contLen = tlen; action.contLen = tlen;
......
...@@ -505,8 +505,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) { ...@@ -505,8 +505,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
} }
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans,
pTrans->startFunc); mndTransStr(pTrans->stage), pTrans->startFunc);
if (pTrans->startFunc > 0) { if (pTrans->startFunc > 0) {
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc); TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
...@@ -577,8 +577,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) { ...@@ -577,8 +577,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64, mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), pNew->createdTime);
pNew->createdTime);
if (pOld->createdTime != pNew->createdTime) { if (pOld->createdTime != pNew->createdTime) {
mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64 mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
...@@ -650,6 +649,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, ...@@ -650,6 +649,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0;
taosInitRWLatch(&pTrans->lockRpcArray); taosInitRWLatch(&pTrans->lockRpcArray);
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
...@@ -706,7 +706,8 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { ...@@ -706,7 +706,8 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
} }
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw}; STransAction action = {
.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw, .mTraceId = pTrans->mTraceId};
return mndTransAppendAction(pTrans->redoActions, &action); return mndTransAppendAction(pTrans->redoActions, &action);
} }
...@@ -728,6 +729,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { ...@@ -728,6 +729,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION; pAction->stage = TRN_STAGE_REDO_ACTION;
pAction->actionType = TRANS_ACTION_MSG; pAction->actionType = TRANS_ACTION_MSG;
pAction->mTraceId = pTrans->mTraceId;
return mndTransAppendAction(pTrans->redoActions, pAction); return mndTransAppendAction(pTrans->redoActions, pAction);
} }
...@@ -875,7 +877,6 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) { ...@@ -875,7 +877,6 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
} }
} }
if (mndCheckTransConflict(pMnode, pTrans)) { if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -912,6 +913,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -912,6 +913,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->pRpcArray = pTrans->pRpcArray; pNew->pRpcArray = pTrans->pRpcArray;
pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen; pNew->rpcRspLen = pTrans->rpcRspLen;
pNew->mTraceId = pTrans->mTraceId;
pTrans->pRpcArray = NULL; pTrans->pRpcArray = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
...@@ -1167,6 +1169,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio ...@@ -1167,6 +1169,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
rpcMsg.info.traceId.rootId = pTrans->mTraceId;
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
char detail[1024] = {0}; char detail[1024] = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册