From f00b7b3af013040d13a21692989f86c9ec9dc618 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Feb 2023 14:09:56 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndConsumer.c | 7 ++- source/dnode/mnode/impl/src/mndScheduler.c | 2 - source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/mnode/impl/src/mndTopic.c | 71 ++++++++++++++-------- source/dnode/mnode/impl/src/mndUser.c | 8 ++- 5 files changed, 58 insertions(+), 31 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2059314c64..f51125e368 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -256,8 +256,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64, - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime); + mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64", hbstatus:%d", + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime, + hbStatus); if (status == MQ_CONSUMER_STATUS__READY) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { @@ -269,6 +270,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg), }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { @@ -282,6 +284,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg), }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index ca79b8e122..b5fba58ba8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -551,8 +551,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0); } - ASSERT(pSub->unassignedVgs); - void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0d805b04fc..d1ae203fd5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 823d2ff1f0..8125035307 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -364,7 +364,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, const char *userName) { - mInfo("topic:%s to create", pCreate->name); + mInfo("topic:%s created", pCreate->name); + SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -383,19 +384,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.subType = pCreate->subType; topicObj.withMeta = pCreate->withMeta; - if (topicObj.withMeta) { - if (topicObj.subType == TOPIC_SUB_TYPE__COLUMN) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + + if (topicObj.withMeta && topicObj.subType == TOPIC_SUB_TYPE__COLUMN) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; } if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; - qDebugL("ast %s", topicObj.ast); + qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { @@ -409,18 +409,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("failed to create topic:%s since %s", pCreate->name, terrstr()); taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.sql); return -1; } - int64_t ntbUid; topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); if (topicObj.ntbColIds == NULL) { + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + extractTopicTbInfo(pAst, &topicObj); if (topicObj.ntbUid == 0) { @@ -467,7 +469,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFreeClear(topicObj.physicalPlan); return -1; } - mInfo("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); + + mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { @@ -476,6 +479,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * mndTransDrop(pTrans); return -1; } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { @@ -544,7 +548,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.ast); taosArrayDestroy(topicObj.ntbColIds); - if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema); + + if (topicObj.schema.nCols) { + taosMemoryFreeClear(topicObj.schema.pSchema); + } + mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -589,11 +597,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { } code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); + mError("failed to create topic:%s since %s", createTopicReq.name, terrstr()); } mndReleaseTopic(pMnode, pTopic); @@ -605,13 +615,18 @@ _OVER: static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) { int32_t code = -1; - if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER; + if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) { + goto _OVER; + } SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) { + goto _OVER; + } + code = 0; _OVER: @@ -650,7 +665,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { SMqConsumerObj *pConsumer; while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; @@ -661,7 +678,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mndReleaseConsumer(pMnode, pConsumer); mndReleaseTopic(pMnode, pTopic); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name, + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); return -1; } @@ -773,6 +790,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + *pNumOfTopics = 0; + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { @@ -785,7 +804,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo while (1) { SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } if (pTopic->dbUid == pDb->uid) { numOfTopics++; @@ -814,10 +835,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl int32_t cols = 0; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0}; - tstrncpy(varDataVal(topicName), mndGetDbStr(pTopic->name), sizeof(topicName) - 2); - /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/ - /*tNameGetDbName(&n, varDataVal(topicName));*/ - varDataSetLen(topicName, strlen(varDataVal(topicName))); + const char* pName = mndGetDbStr(pTopic->name); + STR_TO_VARSTR(topicName, pName); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); @@ -825,6 +845,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB); tNameGetDbName(&n, varDataVal(dbName)); varDataSetLen(dbName, strlen(varDataVal(dbName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)dbName, false); @@ -832,8 +853,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN); - varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); + STR_TO_VARSTR(sql, pTopic->sql); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)sql, false); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index b965e13316..b1c346a2d7 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { while (1) { pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } code = -1; - if (mndUserDupObj(pUser, &newUser) != 0) break; + if (mndUserDupObj(pUser, &newUser) != 0) { + break; + } bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL); if (inTopic) { -- GitLab