提交 4b92876f 编写于 作者: S Shengliang Guan

fix: continue execute transactions after taosd restart

上级 10731f97
......@@ -17,9 +17,9 @@
#include "mndTrans.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSync.h"
#include "mndUser.h"
......@@ -138,6 +138,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
......@@ -149,14 +150,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
......@@ -175,14 +176,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
......@@ -201,14 +202,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
......@@ -305,6 +306,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
if (pTrans->undoActions == NULL) goto _OVER;
if (pTrans->commitActions == NULL) goto _OVER;
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
......@@ -317,7 +319,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
......@@ -328,8 +330,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
......@@ -353,7 +355,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
......@@ -364,8 +366,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
......@@ -389,7 +391,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
......@@ -400,8 +402,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
......@@ -816,7 +818,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
}
}
......@@ -825,9 +827,8 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict);
} else {
mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id,
pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict);
mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id, pNew->dbname,
pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, pTrans->conflict);
}
sdbRelease(pMnode->pSdb, pTrans);
}
......@@ -930,7 +931,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
mInfo("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pInfo->ahandle);
pInfo->ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
......@@ -1013,8 +1014,8 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
pAction->errCode = pRsp->code;
}
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
mndTransExecute(pMnode, pTrans);
_OVER:
......@@ -1030,7 +1031,7 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mInfo("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
pAction->id, pAction->epSet.inUse);
pAction->id, pAction->epSet.inUse);
} else {
mInfo("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
}
......@@ -1060,7 +1061,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
pAction->errCode = 0;
code = 0;
mInfo("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
mndSetTransLastAction(pTrans, pAction);
} else {
......@@ -1261,7 +1262,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
pTrans->code = 0;
pTrans->redoActionPos++;
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id);
pAction->id);
code = mndTransSync(pMnode, pTrans);
if (code != 0) {
pTrans->code = terrno;
......@@ -1280,7 +1281,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
terrno = code;
pTrans->code = code;
mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册