diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index d3cb19231e565df33ffab44d6b749cd0b9416fe7..4bbe531bf8e1bb50598e0a801a0552817084a34e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -377,6 +377,10 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, const char *userName) { mInfo("start to create topic:%s", pCreate->name); + STrans *pTrans = NULL; + int32_t code = -1; + SNode *pAst = NULL; + SQueryPlan *pPlan = NULL; SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -401,7 +405,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (pCreate->withMeta) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto _OUT; } topicObj.ast = taosStrdup(pCreate->ast); @@ -409,32 +413,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); - SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { - taosMemoryFree(topicObj.ast); - taosMemoryFree(topicObj.sql); mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto _OUT; } - SQueryPlan *pPlan = NULL; - SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { mError("failed to create topic:%s since %s", pCreate->name, terrstr()); - taosMemoryFree(topicObj.ast); - taosMemoryFree(topicObj.sql); - nodesDestroyNode(pAst); - return -1; + goto _OUT; } topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); if (topicObj.ntbColIds == NULL) { - taosMemoryFree(topicObj.ast); - taosMemoryFree(topicObj.sql); - nodesDestroyNode(pAst); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto _OUT; } extractTopicTbInfo(pAst, &topicObj); @@ -446,25 +439,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - taosMemoryFree(topicObj.ast); - taosMemoryFree(topicObj.sql); - nodesDestroyNode(pAst); - return -1; + goto _OUT; } if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - taosMemoryFree(topicObj.ast); - taosMemoryFree(topicObj.sql); - return -1; + goto _OUT; } - nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) { SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; - return -1; + goto _OUT; } strcpy(topicObj.stbName, pCreate->subStbName); @@ -483,23 +469,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * /*topicObj.withTbName = 1;*/ /*topicObj.withSchema = 1;*/ - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - taosMemoryFreeClear(topicObj.ast); - taosMemoryFreeClear(topicObj.sql); - taosMemoryFreeClear(topicObj.physicalPlan); - return -1; + goto _OUT; } + mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + goto _OUT; + } mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - taosMemoryFreeClear(topicObj.physicalPlan); - mndTransDrop(pTrans); - return -1; + goto _OUT; } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); @@ -528,17 +513,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); - mndTransDrop(pTrans); - return -1; + goto _OUT; } void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, len); if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { + taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); - mndTransDrop(pTrans); - return -1; + goto _OUT; } tEncoderClear(&encoder); ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); @@ -551,32 +535,32 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); - mndTransDrop(pTrans); - return -1; + goto _OUT; } - + buf = NULL; sdbRelease(pSdb, pVgroup); } } if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - taosMemoryFreeClear(topicObj.physicalPlan); - mndTransDrop(pTrans); - return -1; + goto _OUT; } + code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OUT: taosMemoryFreeClear(topicObj.physicalPlan); taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.ast); taosArrayDestroy(topicObj.ntbColIds); - if (topicObj.schema.nCols) { taosMemoryFreeClear(topicObj.schema.pSchema); } - + nodesDestroyNode(pAst); + nodesDestroyNode((SNode *)pPlan); mndTransDrop(pTrans); - return TSDB_CODE_ACTION_IN_PROGRESS; + return code; } static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {