未验证 提交 691cc93e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13311 from taosdata/fix/mnode

refactor: make trans support multi steps
...@@ -254,6 +254,7 @@ typedef enum ELogicConditionType { ...@@ -254,6 +254,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64 #define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128 #define TSDB_STEP_DESC_LEN 128
......
...@@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dTrace("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pRpc->msgType));
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
if (IsReq(pRpc)) { if (IsReq(pRpc)) {
......
...@@ -60,14 +60,12 @@ typedef enum { ...@@ -60,14 +60,12 @@ typedef enum {
typedef enum { typedef enum {
TRN_STAGE_PREPARE = 0, TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_LOG = 1, TRN_STAGE_REDO_ACTION = 1,
TRN_STAGE_REDO_ACTION = 2, TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_ROLLBACK = 3, TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_UNDO_ACTION = 4, TRN_STAGE_COMMIT = 4,
TRN_STAGE_UNDO_LOG = 5, TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_COMMIT = 6, TRN_STAGE_FINISHED = 6
TRN_STAGE_COMMIT_LOG = 7,
TRN_STAGE_FINISHED = 8
} ETrnStage; } ETrnStage;
typedef enum { typedef enum {
...@@ -131,7 +129,7 @@ typedef enum { ...@@ -131,7 +129,7 @@ typedef enum {
typedef enum { typedef enum {
TRN_EXEC_PARALLEL = 0, TRN_EXEC_PARALLEL = 0,
TRN_EXEC_ONE_BY_ONE = 1, TRN_EXEC_NO_PARALLEL = 1,
} ETrnExecType; } ETrnExecType;
typedef enum { typedef enum {
...@@ -168,16 +166,16 @@ typedef struct { ...@@ -168,16 +166,16 @@ typedef struct {
SRpcHandleInfo rpcInfo; SRpcHandleInfo rpcInfo;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
SArray* redoLogs; int32_t redoActionPos;
SArray* undoLogs;
SArray* commitLogs;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;
SArray* commitActions;
int64_t createdTime; int64_t createdTime;
int64_t lastExecTime; int64_t lastExecTime;
int64_t dbUid; int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN]; char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN]; char lastError[TSDB_TRANS_ERROR_LEN];
char desc[TSDB_TRANS_DESC_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
......
...@@ -26,31 +26,24 @@ typedef enum { ...@@ -26,31 +26,24 @@ typedef enum {
TRANS_START_FUNC_TEST = 1, TRANS_START_FUNC_TEST = 1,
TRANS_STOP_FUNC_TEST = 2, TRANS_STOP_FUNC_TEST = 2,
TRANS_START_FUNC_MQ_REB = 3, TRANS_START_FUNC_MQ_REB = 3,
TRANS_STOP_FUNC_TEST_MQ_REB = 4, TRANS_STOP_FUNC_MQ_REB = 4,
} ETrnFunc; } ETrnFunc;
typedef struct { typedef struct {
SEpSet epSet; int32_t id;
tmsg_t msgType; int32_t errCode;
int8_t msgSent; int32_t acceptableCode;
int8_t msgReceived; int8_t stage;
int32_t errCode; int8_t isRaw;
int32_t acceptableCode; int8_t rawWritten;
int32_t contLen; int8_t msgSent;
void *pCont; int8_t msgReceived;
} STransAction; tmsg_t msgType;
SEpSet epSet;
typedef struct { int32_t contLen;
void *pCont;
SSdbRaw *pRaw; SSdbRaw *pRaw;
} STransLog; } STransAction;
typedef struct {
ETrnStep stepType;
STransAction redoAction;
STransAction undoAction;
STransLog redoLog;
STransLog undoLog;
} STransStep;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
...@@ -69,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); ...@@ -69,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetExecOneByOne(STrans *pTrans); void mndTransSetNoParallel(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp); void mndTransProcessRsp(SRpcMsg *pRsp);
......
...@@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { ...@@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw); mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr()); mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
...@@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { ...@@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { ...@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
#endif
} }
static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) {
......
...@@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id; pMnode->clusterId = clusterObj.id;
mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw); mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
#if 0 #if 0
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
#else #else
......
...@@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName); SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
if (pDb == NULL) { if (pDb == NULL) {
mDebug("db:%s, no exist", pDbVgVersion->dbFName); mTrace("db:%s, no exist", pDbVgVersion->dbFName);
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN); memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDbVgVersion->dbId; usedbRsp.uid = pDbVgVersion->dbId;
usedbRsp.vgVersion = -1; usedbRsp.vgVersion = -1;
......
...@@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { ...@@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw); mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw);
#if 0 #if 0
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
...@@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseMnode(pMnode, pObj); mndReleaseMnode(pMnode, pObj);
} }
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot; bool needCheck = !online || dnodeChanged || reboot;
...@@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (!online) { if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id); mInfo("dnode:%d, from offline to online", pDnode->id);
} else { } else {
mDebug("dnode:%d, send dnode eps", pDnode->id); mDebug("dnode:%d, send dnode epset, online:%d ver:% " PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
statusReq.dnodeVer, dnodeVer, reboot);
} }
pDnode->rebootTime = statusReq.rebootTime; pDnode->rebootTime = statusReq.rebootTime;
...@@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
SStatusRsp statusRsp = {0}; SStatusRsp statusRsp = {0};
statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); statusRsp.dnodeVer = dnodeVer;
statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.dnodeId = pDnode->id;
statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
......
...@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { ...@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) { } else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg); mTrace("msg:%p, successfully processed and response", pMsg);
} else { } else {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
......
...@@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { ...@@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw); mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
#if 0 #if 0
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
...@@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, ...@@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetExecOneByOne(pTrans); mndTransSetNoParallel(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
...@@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { ...@@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetExecOneByOne(pTrans); mndTransSetNoParallel(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
......
...@@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb); mndTransSetDbInfo(pTrans, pDb);
mndTransSetExecOneByOne(pTrans); mndTransSetNoParallel(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
......
...@@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { ...@@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
pReq->info.rspLen = rspLen; pReq->info.rspLen = rspLen;
code = 0; code = 0;
mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
_OVER: _OVER:
if (code != 0) { if (code != 0) {
......
...@@ -501,7 +501,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -501,7 +501,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 4. TODO commit log: modification log // 4. TODO commit log: modification log
// 5. set cb // 5. set cb
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0); mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
......
...@@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { ...@@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void mndRestoreFinish(struct SSyncFSM *pFsm) { void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) { if (!pMnode->deploy) {
mInfo("mnode sync restore finished"); mInfo("mnode sync restore finished, and will handle outstanding transactions");
mndTransPullup(pMnode); mndTransPullup(pMnode);
mndSetRestore(pMnode, true); mndSetRestore(pMnode, true);
} else { } else {
...@@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) {
} else { } else {
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
} }
mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {}
......
...@@ -37,19 +37,18 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); ...@@ -37,19 +37,18 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray); static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray); static void mndTransDropActions(SArray *pArray);
static void mndTransDropData(STrans *pTrans); static void mndTransDropData(STrans *pTrans);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
...@@ -83,40 +82,30 @@ int32_t mndInitTrans(SMnode *pMnode) { ...@@ -83,40 +82,30 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {} void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) { static int32_t mndTransGetActionsSize(SArray *pArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t actionNum = taosArrayGetSize(pArray);
int32_t rawDataLen = 0;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < actionNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); STransAction *pAction = taosArrayGet(pArray, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); if (pAction->isRaw) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
} else {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
rawDataLen += sizeof(pAction->isRaw);
} }
for (int32_t i = 0; i < commitLogNum; ++i) { return rawDataLen;
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); }
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
for (int32_t i = 0; i < redoActionNum; ++i) { static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i); terrno = TSDB_CODE_OUT_OF_MEMORY;
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) { int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
STransAction *pAction = taosArrayGet(pTrans->undoActions, i); rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += (sizeof(STransAction) + pAction->contLen); rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
} rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen);
if (pRaw == NULL) { if (pRaw == NULL) {
...@@ -126,67 +115,85 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -126,67 +115,85 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER)
ETrnStage stage = pTrans->stage;
if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) {
stage = TRN_STAGE_PREPARE;
} else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) {
stage = TRN_STAGE_ROLLBACK;
} else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) {
stage = TRN_STAGE_COMMIT;
} else {
}
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER) int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i); STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
} }
for (int32_t i = 0; i < undoActionNum; ++i) { for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i); STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
} }
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
...@@ -220,11 +227,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -220,11 +227,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
char *pData = NULL; char *pData = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int8_t sver = 0; int8_t sver = 0;
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0; int32_t redoActionNum = 0;
int32_t undoActionNum = 0; int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0; int32_t dataPos = 0;
STransAction action = {0}; STransAction action = {0};
...@@ -256,78 +261,105 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -256,78 +261,105 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->type = type; pTrans->type = type;
pTrans->parallel = parallel; pTrans->parallel = parallel;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *));
pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *));
pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
if (pTrans->redoLogs == NULL) goto _OVER;
if (pTrans->undoLogs == NULL) goto _OVER;
if (pTrans->commitLogs == NULL) goto _OVER;
if (pTrans->redoActions == NULL) goto _OVER; if (pTrans->redoActions == NULL) goto _OVER;
if (pTrans->undoActions == NULL) goto _OVER; if (pTrans->undoActions == NULL) goto _OVER;
if (pTrans->commitActions == NULL) goto _OVER;
for (int32_t i = 0; i < redoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
action.pCont = taosMemoryMalloc(action.contLen); SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.pCont == NULL) goto _OVER; if (action.isRaw) {
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pCont = NULL; action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
} }
for (int32_t i = 0; i < undoActionNum; ++i) { for (int32_t i = 0; i < undoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
action.pCont = taosMemoryMalloc(action.contLen); SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.pCont == NULL) goto _OVER; if (action.isRaw) {
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pCont = NULL; action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
} }
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
...@@ -347,7 +379,6 @@ _OVER: ...@@ -347,7 +379,6 @@ _OVER:
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
mndTransDropData(pTrans); mndTransDropData(pTrans);
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
taosMemoryFreeClear(pData);
taosMemoryFreeClear(action.pCont); taosMemoryFreeClear(action.pCont);
return NULL; return NULL;
} }
...@@ -360,20 +391,16 @@ static const char *mndTransStr(ETrnStage stage) { ...@@ -360,20 +391,16 @@ static const char *mndTransStr(ETrnStage stage) {
switch (stage) { switch (stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
return "prepare"; return "prepare";
case TRN_STAGE_REDO_LOG:
return "redoLog";
case TRN_STAGE_REDO_ACTION: case TRN_STAGE_REDO_ACTION:
return "redoAction"; return "redoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_LOG:
return "commitLog";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_UNDO_LOG:
return "undoLog";
case TRN_STAGE_ROLLBACK: case TRN_STAGE_ROLLBACK:
return "rollback"; return "rollback";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_ACTION:
return "commitAction";
case TRN_STAGE_FINISHED: case TRN_STAGE_FINISHED:
return "finished"; return "finished";
default: default:
...@@ -472,7 +499,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) { ...@@ -472,7 +499,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
return mndTransTestStopFunc; return mndTransTestStopFunc;
case TRANS_START_FUNC_MQ_REB: case TRANS_START_FUNC_MQ_REB:
return mndRebCntInc; return mndRebCntInc;
case TRANS_STOP_FUNC_TEST_MQ_REB: case TRANS_STOP_FUNC_MQ_REB:
return mndRebCntDec; return mndRebCntDec;
default: default:
return NULL; return NULL;
...@@ -493,11 +520,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { ...@@ -493,11 +520,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
} }
static void mndTransDropData(STrans *pTrans) { static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions); mndTransDropActions(pTrans->undoActions);
mndTransDropActions(pTrans->commitActions);
if (pTrans->rpcRsp != NULL) { if (pTrans->rpcRsp != NULL) {
taosMemoryFree(pTrans->rpcRsp); taosMemoryFree(pTrans->rpcRsp);
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
...@@ -511,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) { ...@@ -511,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) {
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
callFunc); callFunc);
if (pTrans->stopFunc > 0 && callFunc) { if (pTrans->stopFunc > 0 && callFunc) {
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc); TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
...@@ -524,20 +549,35 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { ...@@ -524,20 +549,35 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
return 0; return 0;
} }
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
if (pNew->stage == TRN_STAGE_COMMIT) { for (int32_t i = 0; i < taosArrayGetSize(pOldArray); ++i) {
pNew->stage = TRN_STAGE_COMMIT_LOG; STransAction *pOldAction = taosArrayGet(pOldArray, i);
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); STransAction *pNewAction = taosArrayGet(pNewArray, i);
} pOldAction->rawWritten = pNewAction->rawWritten;
pOldAction->msgSent = pNewAction->msgSent;
if (pNew->stage == TRN_STAGE_ROLLBACK) { pOldAction->msgReceived = pNewAction->msgReceived;
pNew->stage = TRN_STAGE_FINISHED; pOldAction->errCode = pNewAction->errCode;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED));
} }
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
pOld->stage = pNew->stage; pOld->stage = pNew->stage;
pOld->redoActionPos = pNew->redoActionPos;
if (pOld->stage == TRN_STAGE_COMMIT) {
pOld->stage = TRN_STAGE_COMMIT_ACTION;
mTrace("trans:%d, stage from commit to commitAction", pNew->id);
}
if (pOld->stage == TRN_STAGE_ROLLBACK) {
pOld->stage = TRN_STAGE_FINISHED;
mTrace("trans:%d, stage from rollback to finished", pNew->id);
}
return 0; return 0;
} }
...@@ -566,40 +606,32 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S ...@@ -566,40 +606,32 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->type = type; pTrans->type = type;
pTrans->parallel = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs(); pTrans->createdTime = taosGetTimestampMs();
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) {
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr()); mError("failed to create transaction since %s", terrstr());
return NULL; return NULL;
} }
mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans); if (pReq != NULL) pTrans->rpcInfo = pReq->info;
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
static void mndTransDropLogs(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
sdbFreeRaw(pRaw);
}
taosArrayDestroy(pArray);
}
static void mndTransDropActions(SArray *pArray) { static void mndTransDropActions(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray); int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i); STransAction *pAction = taosArrayGet(pArray, i);
taosMemoryFreeClear(pAction->pCont); if (pAction->isRaw) {
taosMemoryFreeClear(pAction->pRaw);
} else {
taosMemoryFreeClear(pAction->pCont);
}
} }
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
...@@ -608,18 +640,15 @@ static void mndTransDropActions(SArray *pArray) { ...@@ -608,18 +640,15 @@ static void mndTransDropActions(SArray *pArray) {
void mndTransDrop(STrans *pTrans) { void mndTransDrop(STrans *pTrans) {
if (pTrans != NULL) { if (pTrans != NULL) {
mndTransDropData(pTrans); mndTransDropData(pTrans);
mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); mTrace("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
taosMemoryFreeClear(pTrans); taosMemoryFreeClear(pTrans);
} }
} }
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
if (pArray == NULL || pRaw == NULL) { pAction->id = taosArrayGetSize(pArray);
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
void *ptr = taosArrayPush(pArray, &pRaw); void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -628,27 +657,28 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { ...@@ -628,27 +657,28 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
return 0; return 0;
} }
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw};
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); } return mndTransAppendAction(pTrans->redoActions, &action);
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
void *ptr = taosArrayPush(pArray, pAction); STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw};
if (ptr == NULL) { return mndTransAppendAction(pTrans->undoActions, &action);
terrno = TSDB_CODE_OUT_OF_MEMORY; }
return -1;
}
return 0; int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->commitActions, &action);
} }
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION;
return mndTransAppendAction(pTrans->redoActions, pAction); return mndTransAppendAction(pTrans->redoActions, pAction);
} }
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_UNDO_ACTION;
return mndTransAppendAction(pTrans->undoActions, pAction); return mndTransAppendAction(pTrans->undoActions, pAction);
} }
...@@ -665,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void * ...@@ -665,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
} }
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
pTrans->dbUid = pDb->uid;
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
} }
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; } void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
...@@ -679,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -679,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other nodes", pTrans->id); mDebug("trans:%d, sync to other mnodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
...@@ -732,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { ...@@ -732,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true; conflict = true;
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true; conflict = true;
} }
...@@ -745,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { ...@@ -745,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true; conflict = true;
} else if (mndIsDbTrans(pTrans)) { } else if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true; conflict = true;
} }
...@@ -768,7 +797,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -768,7 +797,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1; return -1;
} }
if (taosArrayGetSize(pTrans->commitLogs) <= 0) { if (taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1; return -1;
...@@ -799,8 +828,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -799,8 +828,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
} }
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
mDebug("trans:%d, commit transaction", pTrans->id); mDebug("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) { if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr()); mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
...@@ -829,8 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -829,8 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
pTrans->stage == TRN_STAGE_ROLLBACK) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true; sendRsp = true;
} }
...@@ -848,13 +874,9 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -848,13 +874,9 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
taosMemoryFree(pTrans->rpcRsp); taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code, pTrans->stage, pTrans->rpcInfo.ahandle); mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
SRpcMsg rspMsg = { pTrans->rpcInfo.ahandle);
.code = code, SRpcMsg rspMsg = {.code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, .info = pTrans->rpcInfo};
.pCont = rpcCont,
.contLen = pTrans->rpcRspLen,
.info = pTrans->rpcInfo,
};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL; pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
...@@ -904,146 +926,125 @@ void mndTransProcessRsp(SRpcMsg *pRsp) { ...@@ -904,146 +926,125 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
} }
} }
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%04x", transId, action, pRsp->code, mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action,
pAction->acceptableCode); pRsp->code, pAction->acceptableCode);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
_OVER: _OVER:
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb; int32_t numOfActions = taosArrayGetSize(pArray);
int32_t arraySize = taosArrayGetSize(pArray);
if (arraySize == 0) return 0; for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction->msgSent && pAction->msgReceived &&
(pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode))
continue;
if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue;
int32_t code = 0; pAction->rawWritten = 0;
for (int32_t i = 0; i < arraySize; ++i) { pAction->msgSent = 0;
SSdbRaw *pRaw = taosArrayGetP(pArray, i); pAction->msgReceived = 0;
if (sdbWriteWithoutFree(pSdb, pRaw) != 0) { pAction->errCode = 0;
code = ((terrno != 0) ? terrno : -1); mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
}
} }
terrno = code;
return code;
} }
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); if (pAction->rawWritten) return 0;
if (code != 0) {
mError("failed to execute redoLogs since %s", terrstr());
}
return code;
}
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
if (code != 0) { pAction->rawWritten = true;
mError("failed to execute undoLogs since %s, return success", terrstr()); pAction->errCode = 0;
code = 0;
mDebug("trans:%d, %s:%d write to sdb", pTrans->id, mndTransStr(pAction->stage), pAction->id);
} else {
pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d failed to write sdb since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
} }
return 0; // return success in any case
}
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
if (code != 0) {
mError("failed to execute commitLogs since %s", terrstr());
}
return code; return code;
} }
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int32_t numOfActions = taosArrayGetSize(pArray); if (pAction->msgSent) return 0;
if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1;
for (int32_t action = 0; action < numOfActions; ++action) { int64_t signature = pTrans->id;
STransAction *pAction = taosArrayGet(pArray, action); signature = (signature << 32);
if (pAction == NULL) continue; signature += pAction->id;
if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
pAction->msgSent = 0; SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
if (code == 0) {
pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action); mDebug("trans:%d, %s:%d is sent to %s:%u", pTrans->id, mndTransStr(pAction->stage), pAction->id,
pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port);
} else {
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d not send since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr());
}
return code;
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->isRaw) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else {
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
} }
} }
static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0) break;
if (pAction->msgSent) {
if (pAction->msgReceived) {
continue;
} else {
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
} else {
continue;
}
}
}
int64_t signature = pTrans->id;
signature = (signature << 32);
signature += action;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn,
pAction->epSet.eps[pAction->epSet.inUse].port);
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
}
} else {
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = terrno;
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1;
}
} }
return 0; return code;
} }
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0; if (numOfActions == 0) return 0;
if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) { if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) {
return -1; return -1;
} }
int32_t numOfReceived = 0; int32_t numOfExecuted = 0;
int32_t errCode = 0; int32_t errCode = 0;
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction->msgReceived || pAction->rawWritten) {
if (pAction->msgSent && pAction->msgReceived) { numOfExecuted++;
numOfReceived++;
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
errCode = pAction->errCode; errCode = pAction->errCode;
} }
} }
} }
if (numOfReceived == numOfActions) { if (numOfExecuted == numOfActions) {
if (errCode == 0) { if (errCode == 0) {
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0; return 0;
...@@ -1054,7 +1055,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -1054,7 +1055,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode; return errCode;
} }
} else { } else {
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
} }
...@@ -1075,35 +1076,79 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { ...@@ -1075,35 +1076,79 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return code; return code;
} }
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true; int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions);
pTrans->stage = TRN_STAGE_REDO_LOG; if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage from prepare to redoLog", pTrans->id); mError("failed to execute commitActions since %s", terrstr());
return continueExec; }
return code;
} }
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true; int32_t code = 0;
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code;
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code == 0) {
if (pAction->msgSent) {
if (pAction->msgReceived) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
} else {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
if (pAction->rawWritten) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
}
}
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->redoActionPos++;
pTrans->stage = TRN_STAGE_REDO_ACTION; mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id); pAction->id);
} else { code = mndTransSync(pMnode, pTrans);
pTrans->code = terrno; if (code != 0) {
pTrans->stage = TRN_STAGE_UNDO_LOG; mError("trans:%d, failed to sync redoActionPos since %s", pTrans->id, terrstr());
mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr()); break;
}
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
break;
} else {
mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
break;
}
} }
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec; return continueExec;
} }
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); int32_t code = 0;
if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) {
code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans);
} else {
code = mndTransExecuteRedoActions(pMnode, pTrans);
}
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
...@@ -1135,8 +1180,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1135,8 +1180,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT_LOG; pTrans->stage = TRN_STAGE_COMMIT_ACTION;
mDebug("trans:%d, stage from commit to commitLog", pTrans->id); mDebug("trans:%d, stage from commit to commitAction", pTrans->id);
continueExec = true; continueExec = true;
} else { } else {
pTrans->code = terrno; pTrans->code = terrno;
...@@ -1155,35 +1200,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1155,35 +1200,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); int32_t code = mndTransExecuteCommitActions(pMnode, pTrans);
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED; pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from commitLog to finished", pTrans->id); mDebug("trans:%d, stage from commitAction to finished", pTrans->id);
continueExec = true; continueExec = true;
} else { } else {
pTrans->code = terrno; pTrans->code = terrno;
pTrans->failedTimes++; pTrans->failedTimes++;
mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
return continueExec;
}
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
continueExec = true;
} else {
mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
continueExec = false; continueExec = false;
} }
...@@ -1191,14 +1220,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1191,14 +1220,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
} }
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (code == 0) { if (code == 0) {
pTrans->stage = TRN_STAGE_UNDO_LOG; pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
continueExec = true; continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
...@@ -1243,8 +1270,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1243,8 +1270,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
} }
mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); mDebug("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec; return continueExec;
} }
...@@ -1257,24 +1283,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { ...@@ -1257,24 +1283,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans); continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
break; break;
case TRN_STAGE_REDO_LOG:
continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_ACTION: case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break; break;
case TRN_STAGE_UNDO_LOG: case TRN_STAGE_COMMIT:
continueExec = mndTransPerformUndoLogStage(pMnode, pTrans); continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break; break;
case TRN_STAGE_UNDO_ACTION: case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break; break;
case TRN_STAGE_COMMIT_LOG:
continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK: case TRN_STAGE_ROLLBACK:
continueExec = mndTransPerformRollbackStage(pMnode, pTrans); continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
break; break;
...@@ -1313,15 +1333,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -1313,15 +1333,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgReceived == 0) { if (pAction->msgReceived == 0) {
mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i); mInfo("trans:%d, %s:%d set processed for kill msg received", pTrans->id, mndTransStr(pAction->stage), i);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = 0; pAction->errCode = 0;
} }
if (pAction->errCode != 0) { if (pAction->errCode != 0) {
mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i, mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
tstrerror(pAction->errCode)); mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = 0; pAction->errCode = 0;
......
...@@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char ...@@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw); mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw);
#if 0 #if 0
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
......
...@@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
*ppVgroups = pVgroups; *ppVgroups = pVgroups;
code = 0; code = 0;
mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
_OVER: _OVER:
if (code != 0) taosMemoryFree(pVgroups); if (code != 0) taosMemoryFree(pVgroups);
...@@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { ...@@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
pVgid->role = TAOS_SYNC_STATE_FOLLOWER; pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
pDnode->numOfVnodes++; pDnode->numOfVnodes++;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
maxPos++; maxPos++;
if (maxPos == 3) return 0; if (maxPos == 3) return 0;
} }
......
...@@ -168,6 +168,7 @@ typedef struct SSdb { ...@@ -168,6 +168,7 @@ typedef struct SSdb {
char *currDir; char *currDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t lastCommitTerm;
int64_t curVer; int64_t curVer;
int64_t curTerm; int64_t curTerm;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
......
...@@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
pSdb->curVer = -1; pSdb->curVer = -1;
pSdb->curTerm = -1; pSdb->curTerm = -1;
pSdb->lastCommitVer = -1; pSdb->lastCommitVer = -1;
pSdb->lastCommitTerm = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL); taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully"); mDebug("sdb init successfully");
......
...@@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) { ...@@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) {
pSdb->curVer = -1; pSdb->curVer = -1;
pSdb->curTerm = -1; pSdb->curTerm = -1;
pSdb->lastCommitVer = -1; pSdb->lastCommitVer = -1;
pSdb->lastCommitTerm = -1;
mDebug("sdb reset successfully"); mDebug("sdb reset successfully");
} }
...@@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file); mDebug("start to read sdb file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
if (pRaw == NULL) { if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read file since %s", terrstr()); mError("failed read sdb file since %s", terrstr());
return -1; return -1;
} }
...@@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if (pFile == NULL) { if (pFile == NULL) {
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read sdb file:%s since %s", file, terrstr());
return 0; return 0;
} }
if (sdbReadFileHead(pSdb, pFile) != 0) { if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr()); mError("failed to read sdb file:%s head since %s", file, terrstr());
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
taosCloseFile(&pFile); taosCloseFile(&pFile);
return -1; return -1;
...@@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if (ret < 0) { if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break; break;
} }
if (ret != readLen) { if (ret != readLen) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break; break;
} }
...@@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
ret = taosReadFile(pFile, pRaw->pData, readLen); ret = taosReadFile(pFile, pRaw->pData, readLen);
if (ret < 0) { if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break; break;
} }
if (ret != readLen) { if (ret != readLen) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break; break;
} }
int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) { if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
code = TSDB_CODE_CHECKSUM_ERROR; code = TSDB_CODE_CHECKSUM_ERROR;
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break; break;
} }
code = sdbWriteWithoutFree(pSdb, pRaw); code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read sdb file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
} }
code = 0; code = 0;
pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitVer = pSdb->curVer;
pSdb->lastCommitTerm = pSdb->curTerm;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer,
pSdb->lastCommitTerm);
_OVER: _OVER:
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
sdbResetData(pSdb); sdbResetData(pSdb);
int32_t code = sdbReadFileImp(pSdb); int32_t code = sdbReadFileImp(pSdb);
if (code != 0) { if (code != 0) {
mError("failed to read sdb since %s", terrstr()); mError("failed to read sdb file since %s", terrstr());
sdbResetData(pSdb); sdbResetData(pSdb);
} }
...@@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64
pSdb->curTerm, pSdb->lastCommitVer); " file:%s",
pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, terrstr()); mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
return -1; return -1;
} }
if (sdbWriteFileHead(pSdb, pFile) != 0) { if (sdbWriteFileHead(pSdb, pFile) != 0) {
mError("failed to write file:%s head since %s", tmpfile, terrstr()); mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
taosCloseFile(&pFile); taosCloseFile(&pFile);
return -1; return -1;
} }
...@@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i]; SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue; if (encodeFp == NULL) continue;
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
TdThreadRwlock *pLock = &pSdb->locks[i]; TdThreadRwlock *pLock = &pSdb->locks[i];
...@@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosFsyncFile(pFile); code = taosFsyncFile(pFile);
if (code != 0) { if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
mError("failed to sync file:%s since %s", tmpfile, tstrerror(code)); mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
} }
} }
...@@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosRenameFile(tmpfile, curfile); code = taosRenameFile(tmpfile, curfile);
if (code != 0) { if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
mError("failed to write file:%s since %s", curfile, tstrerror(code)); mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} }
} }
if (code != 0) { if (code != 0) {
mError("failed to write file:%s since %s", curfile, tstrerror(code)); mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} else { } else {
pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitVer = pSdb->curVer;
mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm); pSdb->lastCommitTerm = pSdb->curTerm;
mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
pSdb->lastCommitTerm, curfile);
} }
terrno = code; terrno = code;
...@@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
int32_t code = sdbWriteFileImp(pSdb); int32_t code = sdbWriteFileImp(pSdb);
if (code != 0) { if (code != 0) {
mError("failed to write sdb since %s", terrstr()); mError("failed to write sdb file since %s", terrstr());
} }
taosThreadMutexUnlock(&pSdb->filelock); taosThreadMutexUnlock(&pSdb->filelock);
return code; return code;
...@@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
if (taosCopyFile(datafile, pIter->name) < 0) { if (taosCopyFile(datafile, pIter->name) < 0) {
taosThreadMutexUnlock(&pSdb->filelock); taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr()); mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
sdbCloseIter(pIter); sdbCloseIter(pIter);
return -1; return -1;
} }
...@@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ); pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
if (pIter->file == NULL) { if (pIter->file == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s since %s", pIter->name, terrstr()); mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
sdbCloseIter(pIter); sdbCloseIter(pIter);
return -1; return -1;
} }
......
...@@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if (taskHandle) { if (taskHandle) {
code = qExecTask(taskHandle, &pRes, &useconds); code = qExecTask(taskHandle, &pRes, &useconds);
if (code) { if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
} else {
QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
}
QW_ERR_RET(code); QW_ERR_RET(code);
} }
} }
......
...@@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) { ...@@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid)); tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
tjsonAddStringToObject(item, "dbname", pObj->dbname); tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddIntegerToObject(item, "redoLogNum", taosArrayGetSize(pObj->redoLogs)); tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "undoLogNum", taosArrayGetSize(pObj->undoLogs));
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitLogs));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册