未验证 提交 a1b97b7a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12881 from taosdata/fix/dnode

fix: commit log should not be null
......@@ -253,6 +253,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D5)
// mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
......
......@@ -35,7 +35,7 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
#ifdef __cplusplus
}
......
......@@ -419,7 +419,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
mndReleaseTopic(pMnode, pTopic);
}
......
......@@ -448,13 +448,13 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
}
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -524,13 +524,13 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
}
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
SSdbRaw *pRedoRaw = mndDnodeActionEncode(pDnode);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
......@@ -153,6 +153,7 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa
return -1;
}
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
// commit log or redo log?
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
return -1;
}
......@@ -188,7 +189,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
pOffsetObj->offset = pOffset->offset;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pOffsetRaw);
mndTransAppendCommitlog(pTrans, pOffsetRaw);
if (create) {
taosMemoryFree(pOffsetObj);
} else {
......
......@@ -743,9 +743,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) {
goto _OVER;
}
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
......
......@@ -279,13 +279,13 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
}
mDebug("trans:%d, used to create stream:%s", pTrans->id, pStream->name);
SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
......
......@@ -479,7 +479,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
}
}
......
......@@ -386,14 +386,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
}
mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
taosMemoryFreeClear(topicObj.physicalPlan);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -473,13 +473,13 @@ CREATE_TOPIC_OVER:
}
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -627,11 +627,11 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
return numOfRows;
}
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
......
......@@ -768,6 +768,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
if (taosArrayGetSize(pTrans->commitLogs) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, prepare transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
......@@ -272,13 +272,13 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
}
mDebug("trans:%d, used to create user:%s", pTrans->id, pCreate->user);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -352,13 +352,13 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
}
mDebug("trans:%d, used to alter user:%s", pTrans->id, pOld->user);
SSdbRaw *pRedoRaw = mndUserActionEncode(pNew);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndUserActionEncode(pNew);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -559,13 +559,13 @@ static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
}
mDebug("trans:%d, used to drop user:%s", pTrans->id, pUser->user);
SSdbRaw *pRedoRaw = mndUserActionEncode(pUser);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
SSdbRaw *pCommitRaw = mndUserActionEncode(pUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
......@@ -31,7 +31,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
)
add_test(
NAME transTest2
COMMAND transTest2
)
# add_test(
# NAME transTest2
# COMMAND transTest2
# )
......@@ -259,6 +259,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exist
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
// mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册