提交 1035312f 编写于 作者: S Shengliang Guan

refactor: make trans support db conflict

上级 5f74daa5
......@@ -57,6 +57,7 @@ typedef enum {
TRN_CONFLICT_NOTHING = 0,
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
} ETrnConflct;
typedef enum {
......
......@@ -34,7 +34,7 @@ typedef struct {
int32_t errCode;
int32_t acceptableCode;
int8_t stage;
int8_t isRaw;
int8_t actionType; // 0-msg, 1-raw
int8_t rawWritten;
int8_t msgSent;
int8_t msgReceived;
......
......@@ -735,7 +735,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
......@@ -1257,7 +1257,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if (code != 0) goto _OVER;
code = -1;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
......@@ -1403,7 +1403,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
......
......@@ -88,12 +88,12 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->isRaw) {
if (pAction->actionType) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
} else {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
rawDataLen += sizeof(pAction->isRaw);
rawDataLen += sizeof(pAction->actionType);
}
return rawDataLen;
......@@ -135,9 +135,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
if (pAction->actionType) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
......@@ -157,9 +157,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
if (pAction->actionType) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
......@@ -179,9 +179,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
if (pAction->actionType) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
......@@ -279,9 +279,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
......@@ -308,9 +308,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
......@@ -337,9 +337,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
......@@ -552,7 +552,7 @@ static void mndTransDropActions(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->isRaw) {
if (pAction->actionType) {
taosMemoryFreeClear(pAction->pRaw);
} else {
taosMemoryFreeClear(pAction->pCont);
......@@ -583,17 +583,17 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->redoActions, &action);
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->undoActions, &action);
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->commitActions, &action);
}
......@@ -646,36 +646,38 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return 0;
}
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
STrans *pTrans = NULL;
void *pIter = NULL;
bool conflict = false;
if (pNewTrans->conflict == TRN_CONFLICT_NOTHING) return conflict;
if (pNew->conflict == TRN_CONFLICT_NOTHING) return conflict;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break;
if (pNewTrans->conflict == TRN_CONFLICT_GLOBAL) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true;
} else if (pNewTrans->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL || strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true;
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pNew->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
}
} else {
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
}
sdbRelease(pMnode->pSdb, pTrans);
}
if (conflict) {
mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname);
}
return conflict;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
......@@ -893,7 +895,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->isRaw) {
if (pAction->actionType) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else {
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册