未验证 提交 d7e9eda6 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21844 from taosdata/FIX/TD-24924-3.0

enh:  change trans conflict scope of create topic
...@@ -377,6 +377,10 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { ...@@ -377,6 +377,10 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
const char *userName) { const char *userName) {
mInfo("start to create topic:%s", pCreate->name); mInfo("start to create topic:%s", pCreate->name);
STrans *pTrans = NULL;
int32_t code = -1;
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
...@@ -401,7 +405,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -401,7 +405,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (pCreate->withMeta) { if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1; goto _OUT;
} }
topicObj.ast = taosStrdup(pCreate->ast); topicObj.ast = taosStrdup(pCreate->ast);
...@@ -409,32 +413,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -409,32 +413,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) { if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1; goto _OUT;
} }
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
mError("failed to create topic:%s since %s", pCreate->name, terrstr()); mError("failed to create topic:%s since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); goto _OUT;
taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
return -1;
} }
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
if (topicObj.ntbColIds == NULL) { if (topicObj.ntbColIds == NULL) {
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; goto _OUT;
} }
extractTopicTbInfo(pAst, &topicObj); extractTopicTbInfo(pAst, &topicObj);
...@@ -446,25 +439,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -446,25 +439,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); goto _OUT;
taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
return -1;
} }
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) { if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); goto _OUT;
taosMemoryFree(topicObj.sql);
return -1;
} }
nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) { } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
if (pStb == NULL) { if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
return -1; goto _OUT;
} }
strcpy(topicObj.stbName, pCreate->subStbName); strcpy(topicObj.stbName, pCreate->subStbName);
...@@ -483,23 +469,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -483,23 +469,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
/*topicObj.withTbName = 1;*/ /*topicObj.withTbName = 1;*/
/*topicObj.withSchema = 1;*/ /*topicObj.withSchema = 1;*/
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-topic"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFreeClear(topicObj.ast); goto _OUT;
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.physicalPlan);
return -1;
} }
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
goto _OUT;
}
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
taosMemoryFreeClear(topicObj.physicalPlan); goto _OUT;
mndTransDrop(pTrans);
return -1;
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
...@@ -528,17 +513,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -528,17 +513,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) { if (code < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); goto _OUT;
return -1;
} }
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); goto _OUT;
return -1;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
...@@ -551,32 +535,32 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -551,32 +535,32 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); goto _OUT;
return -1;
} }
buf = NULL;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
taosMemoryFreeClear(topicObj.physicalPlan); goto _OUT;
mndTransDrop(pTrans);
return -1;
} }
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OUT:
taosMemoryFreeClear(topicObj.physicalPlan); taosMemoryFreeClear(topicObj.physicalPlan);
taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.ast); taosMemoryFreeClear(topicObj.ast);
taosArrayDestroy(topicObj.ntbColIds); taosArrayDestroy(topicObj.ntbColIds);
if (topicObj.schema.nCols) { if (topicObj.schema.nCols) {
taosMemoryFreeClear(topicObj.schema.pSchema); taosMemoryFreeClear(topicObj.schema.pSchema);
} }
nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS; return code;
} }
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册