diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 69e10bd649a1f2b8df904a7f4bf908d36bb2b39b..df4b2204b4d1f36ff8e0fc6efd5854e82d840d73 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -824,7 +824,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId); + tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); @@ -832,7 +832,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); + tscDebug("consumer:0x%"PRIx64" commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { // do nothing @@ -1578,25 +1578,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { } void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { - /*strcpy(pReq->topic, pTopic->topicName);*/ - /*strcpy(pReq->cgroup, tmq->groupId);*/ - int32_t groupLen = strlen(tmq->groupId); memcpy(pReq->subKey, tmq->groupId, groupLen); pReq->subKey[groupLen] = TMQ_SEPARATOR; strcpy(pReq->subKey + groupLen + 1, pTopic->topicName); pReq->withTbName = tmq->withTbName; - pReq->timeout = timeout; pReq->consumerId = tmq->consumerId; + pReq->timeout = timeout; pReq->epoch = tmq->epoch; /*pReq->currentOffset = reqOffset;*/ pReq->reqOffset = pVg->currentOffset; - pReq->reqId = generateRequestId(); - - pReq->useSnapshot = tmq->useSnapshot; - pReq->head.vgId = pVg->vgId; + pReq->useSnapshot = tmq->useSnapshot; + pReq->reqId = generateRequestId(); } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { @@ -1646,6 +1641,76 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } +static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&pTmq->rspSem); + return -1; +} + +static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { + SMqPollReq req = {0}; + tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); + + int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); + if (msgSize < 0) { + return handleErrorBeforePoll(pVg, pTmq); + } + + char* msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + return handleErrorBeforePoll(pVg, pTmq); + } + + if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { + taosMemoryFree(msg); + return handleErrorBeforePoll(pVg, pTmq); + } + + SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); + if (pParam == NULL) { + taosMemoryFree(msg); + return handleErrorBeforePoll(pVg, pTmq); + } + + pParam->refId = pTmq->refId; + pParam->epoch = pTmq->epoch; + pParam->pVg = pVg; // pVg may be released,fix it + pParam->pTopic = pTopic; + pParam->vgId = pVg->vgId; + + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + taosMemoryFree(pParam); + taosMemoryFree(msg); + return handleErrorBeforePoll(pVg, pTmq); + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = msg, + .len = msgSize, + .handle = NULL, + }; + + sendInfo->requestId = req.reqId; + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqPollCb; + sendInfo->msgType = TDMT_VND_TMQ_CONSUME; + + int64_t transporterId = 0; + char offsetFormatBuf[80]; + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset); + + tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64, + pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); + asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + + pVg->pollCnt++; + pTmq->pollCnt++; + + return TSDB_CODE_SUCCESS; +} + // broadcast the poll request to all related vnodes int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); @@ -1654,7 +1719,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + int32_t numOfVg = taosArrayGetSize(pTopic->vgs); + + for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { @@ -1673,77 +1740,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } atomic_store_32(&pVg->vgSkipCnt, 0); - - SMqPollReq req = {0}; - tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg); - int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); - if (msgSize < 0) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); - return -1; - } - - char* msg = taosMemoryCalloc(1, msgSize); - if (NULL == msg) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); - return -1; - } - - if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { - taosMemoryFree(msg); - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); - return -1; - } - - SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (pParam == NULL) { - taosMemoryFree(msg); - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); - return -1; - } - - pParam->refId = tmq->refId; - pParam->epoch = tmq->epoch; - - pParam->pVg = pVg; - pParam->pTopic = pTopic; - pParam->vgId = pVg->vgId; - - SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - taosMemoryFree(msg); - taosMemoryFree(pParam); - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); - return -1; + int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout); + if (code != TSDB_CODE_SUCCESS) { + return code; } - - sendInfo->msgInfo = (SDataBuf){ - .pData = msg, - .len = msgSize, - .handle = NULL, - }; - - sendInfo->requestId = req.reqId; - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqPollCb; - sendInfo->msgType = TDMT_VND_TMQ_CONSUME; - - int64_t transporterId = 0; - - char offsetFormatBuf[80]; - tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); - - tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64, - tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - - pVg->pollCnt++; - tmq->pollCnt++; } } diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 8cd669c769a399282ba1137ca8d24eeace039237..8ed7fc6a11043bddda295759d8ba59e9d26be1f8 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -27,14 +27,9 @@ void mndCleanupTopic(SMnode *pMnode); SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName); void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); - -SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); -SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); - -int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb); - -const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); +int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb); +const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 618e6f3d474c74ee5db2dc7d5da26370239e6367..b056a1f4ab8e9432558b1fc313ae341aca63d6ae 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -256,8 +256,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64, - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime); + mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64", hbstatus:%d", + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime, + hbStatus); if (status == MQ_CONSUMER_STATUS__READY) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { @@ -269,6 +270,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg), }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { @@ -282,6 +284,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg), }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bc23b27ce35d131514b3166e79b588734296bafd..1fd49211e0ccc84ce6fb4a6fed618cc0b62496c3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1035,13 +1035,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db"); - if (pTrans == NULL) goto _OVER; + if (pTrans == NULL) { + goto _OVER; + } + + mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name); - mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; - if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER; + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + goto _OVER; + } + + if (mndTopicExistsForDb(pMnode, pDb)) { + terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; + goto _OVER; + } if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; @@ -1092,10 +1102,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { } code = mndDropDb(pMnode, pReq, pDb); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } _OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("db:%s, failed to drop since %s", dropReq.db, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index ca79b8e12223810f7df779db7be731d29796d613..b5fba58ba8c5cd023ffc56d678bc9f753f1a0acd 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -551,8 +551,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0); } - ASSERT(pSub->unassignedVgs); - void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4bf8f42506efc47a24cda836f986cf94e6758245..8d8fd1b9b5ef39273f697a3f17dcef2365b13529 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b843534fcd7d782eac154842df51e80957265abc..4b50361c356618c37f7c00e746be1ccd0fab38d8 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -31,6 +31,9 @@ #define MND_TOPIC_VER_NUMBER 2 #define MND_TOPIC_RESERVE_SIZE 64 +SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); +SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); + static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic); @@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { if (pTopic->physicalPlan) { physicalPlanLen = strlen(pTopic->physicalPlan) + 1; } + int32_t schemaLen = 0; if (pTopic->schema.nCols) { schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); @@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); - if (pRaw == NULL) goto TOPIC_ENCODE_OVER; + if (pRaw == NULL) { + goto TOPIC_ENCODE_OVER; + } int32_t dataPos = 0; SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER); @@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); + if (pTopic->astLen) { SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER); } @@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); } + SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER); if (pTopic->ntbUid != 0) { int32_t sz = taosArrayGetSize(pTopic->ntbColIds); @@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER); } } + SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); @@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER); taosArrayPush(pTopic->ntbColIds, &colId); } - SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER); + SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); - terrno = TSDB_CODE_SUCCESS; TOPIC_DECODE_OVER: @@ -266,12 +274,12 @@ TOPIC_DECODE_OVER: } static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) { - mTrace("topic:%s, perform insert action", pTopic->name); + mTrace("topic:%s perform insert action", pTopic->name); return 0; } static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { - mTrace("topic:%s, perform delete action", pTopic->name); + mTrace("topic:%s perform delete action", pTopic->name); taosMemoryFreeClear(pTopic->sql); taosMemoryFreeClear(pTopic->ast); taosMemoryFreeClear(pTopic->physicalPlan); @@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { } static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) { - mTrace("topic:%s, perform update action", pOldTopic->name); + mTrace("topic:%s perform update action", pOldTopic->name); atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); @@ -364,7 +372,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, const char *userName) { - mInfo("topic:%s to create", pCreate->name); + mInfo("start to create topic:%s", pCreate->name); + SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -383,19 +392,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.subType = pCreate->subType; topicObj.withMeta = pCreate->withMeta; - if (topicObj.withMeta) { - if (topicObj.subType == TOPIC_SUB_TYPE__COLUMN) { + + if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { + if (pCreate->withMeta) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } - } - if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; - qDebugL("ast %s", topicObj.ast); + qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { @@ -409,18 +417,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("failed to create topic:%s since %s", pCreate->name, terrstr()); taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.sql); return -1; } - int64_t ntbUid; topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); if (topicObj.ntbColIds == NULL) { + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + extractTopicTbInfo(pAst, &topicObj); if (topicObj.ntbUid == 0) { @@ -449,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * terrno = TSDB_CODE_MND_STB_NOT_EXIST; return -1; } + topicObj.stbUid = pStb->uid; mndReleaseStb(pMnode, pStb); } @@ -467,7 +478,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFreeClear(topicObj.physicalPlan); return -1; } - mInfo("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); + + mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { @@ -476,6 +488,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * mndTransDrop(pTrans); return -1; } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { @@ -544,7 +557,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.ast); taosArrayDestroy(topicObj.ntbColIds); - if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema); + + if (topicObj.schema.nCols) { + taosMemoryFreeClear(topicObj.schema.pSchema); + } + mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -589,11 +606,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { } code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); + mError("failed to create topic:%s since %s", createTopicReq.name, terrstr()); } mndReleaseTopic(pMnode, pTopic); @@ -605,13 +624,18 @@ _OVER: static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) { int32_t code = -1; - if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER; + if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) { + goto _OVER; + } SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) { + goto _OVER; + } + code = 0; _OVER: @@ -650,7 +674,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { SMqConsumerObj *pConsumer; while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; @@ -661,7 +687,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mndReleaseConsumer(pMnode, pConsumer); mndReleaseTopic(pMnode, pTopic); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name, + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); return -1; } @@ -773,6 +799,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + *pNumOfTopics = 0; + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { @@ -785,7 +813,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo while (1) { SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } if (pTopic->dbUid == pDb->uid) { numOfTopics++; @@ -814,10 +844,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl int32_t cols = 0; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0}; - tstrncpy(varDataVal(topicName), mndGetDbStr(pTopic->name), sizeof(topicName) - 2); - /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/ - /*tNameGetDbName(&n, varDataVal(topicName));*/ - varDataSetLen(topicName, strlen(varDataVal(topicName))); + const char* pName = mndGetDbStr(pTopic->name); + STR_TO_VARSTR(topicName, pName); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)topicName, false); @@ -825,6 +854,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB); tNameGetDbName(&n, varDataVal(dbName)); varDataSetLen(dbName, strlen(varDataVal(dbName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dbName, false); @@ -832,8 +862,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN); - varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); + STR_TO_VARSTR(sql, pTopic->sql); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); @@ -868,24 +898,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) { - SSdb *pSdb = pMnode->pSdb; - +bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SMqTopicObj *pTopic = NULL; + while (1) { pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } if (pTopic->dbUid == pDb->uid) { sdbRelease(pSdb, pTopic); - terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; - return -1; + return true; } sdbRelease(pSdb, pTopic); } - return 0; + + return false; } #if 0 diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index a03d640ad5f7b44e10ae1527e09fbd68b5fff96e..23822c8f208deb9761f66b32d0b0b31fc0f0e55a 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { while (1) { pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } code = -1; - if (mndUserDupObj(pUser, &newUser) != 0) break; + if (mndUserDupObj(pUser, &newUser) != 0) { + break; + } bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL); if (inTopic) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ba343732826c0ee4f83886f489e1a309955b9fc2..ed13196bd8d953f8ea0b9ec96baea06cf79b58b3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -114,16 +114,18 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } void tqClose(STQ* pTq) { - if (pTq) { - tqOffsetClose(pTq->pOffsetStore); - taosHashCleanup(pTq->pHandle); - taosHashCleanup(pTq->pPushMgr); - taosHashCleanup(pTq->pCheckInfo); - taosMemoryFree(pTq->path); - tqMetaClose(pTq); - streamMetaClose(pTq->pStreamMeta); - taosMemoryFree(pTq); + if (pTq == NULL) { + return; } + + tqOffsetClose(pTq->pOffsetStore); + taosHashCleanup(pTq->pHandle); + taosHashCleanup(pTq->pPushMgr); + taosHashCleanup(pTq->pCheckInfo); + taosMemoryFree(pTq->path); + tqMetaClose(pTq); + streamMetaClose(pTq->pStreamMeta); + taosMemoryFree(pTq); } int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { @@ -158,7 +160,7 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, }; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d", + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type); return 0; @@ -215,9 +217,9 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { char buf1[80] = {0}; char buf2[80] = {0}; - tFormatOffset(buf1, 80, &pRsp->reqOffset); - tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); + tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -273,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -332,8 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64 - ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -344,14 +345,16 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf pLeft->val.version <= pRight->val.version; } -int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqOffset offset = {0}; + SDecoder decoder; - tDecoderInit(&decoder, msg, msgLen); + tDecoderInit(&decoder, (uint8_t*) msg, msgLen); if (tDecodeSTqOffset(&decoder, &offset) < 0) { ASSERT(0); return -1; } + tDecoderClear(&decoder); if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) { @@ -360,17 +363,20 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, TD_VID(pTq->pVnode), offset.val.version); - if (offset.val.version + 1 == version) { + if (offset.val.version + 1 == sversion) { offset.val.version += 1; } } else { - ASSERT(0); + tqError("invalid commit offset type:%d", offset.val.type); + return -1; } - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); - if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { - return 0; + + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) { + return 0; // no need to update the offset value } + // save the new offset value if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { ASSERT(0); return -1; @@ -378,27 +384,25 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m if (offset.val.type == TMQ_OFFSET__LOG) { STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle) { - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - return -1; - } + if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) { + return -1; } } - // rsp - - /*}*/ - /*}*/ - return 0; } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { void* pIter = NULL; + while (1) { pIter = taosHashIterate(pTq->pCheckInfo, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + STqCheckInfo* pCheck = (STqCheckInfo*)pIter; + if (pCheck->ntbUid == tbUid) { int32_t sz = taosArrayGetSize(pCheck->colIdList); for (int32_t i = 0; i < sz; i++) { @@ -410,6 +414,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { } } } + return 0; } @@ -454,6 +459,7 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { return -1; } + return 0; } @@ -472,19 +478,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t reqEpoch = req.epoch; STqOffsetVal reqOffset = req.reqOffset; - // 1.find handle + // 1. find handle STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - /*ASSERT(pHandle);*/ if (pHandle == NULL) { - tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, - TD_VID(pTq->pVnode), req.subKey); + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode), + req.subKey); return -1; } - // check rebalance + // 2. check rebalance if (pHandle->consumerId != consumerId) { - tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64 - ", in vgId:%d, subkey %s, handle consumer id %" PRId64, + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; return -1; @@ -498,7 +502,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { char buf[80]; tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); // 2.reset offset if needed @@ -510,7 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffsetNew = pOffset->val; char formatBuf[80]; tFormatOffset(formatBuf, 80, &fetchOffsetNew); - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey, + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { @@ -595,8 +599,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer %" PRId64 - ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -619,8 +622,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { code = -1; } - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64 - ",version:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64 + ",version:%" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); taosMemoryFree(metaRsp.metaRsp); @@ -638,8 +641,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffsetNew = taosxRsp.rspOffset; } - tqDebug("taosx poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64 "", + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version); } @@ -675,7 +678,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -716,12 +719,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } } + tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return 0; } -int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey); @@ -755,10 +759,10 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL return 0; } -int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqCheckInfo info = {0}; SDecoder decoder; - tDecoderInit(&decoder, msg, msgLen); + tDecoderInit(&decoder, (uint8_t*) msg, msgLen); if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -775,7 +779,7 @@ int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m return 0; } -int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -787,7 +791,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m return 0; } -int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock @@ -975,7 +979,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamTaskCheckReq req; SDecoder decoder; - tDecoderInit(&decoder, msgBody, msgLen); + tDecoderInit(&decoder, (uint8_t*) msgBody, msgLen); tDecodeSStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); int32_t taskId = req.downstreamTaskId; @@ -997,7 +1001,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", + tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SEncoder encoder; @@ -1028,7 +1032,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t code; SStreamTaskCheckRsp rsp; @@ -1041,7 +1045,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_ } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", + tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); @@ -1049,12 +1053,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_ return -1; } - code = streamProcessTaskCheckRsp(pTask, &rsp, version); + code = streamProcessTaskCheckRsp(pTask, &rsp, sversion); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return code; } -int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t code; #if 0 code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); @@ -1069,6 +1073,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg if (pTask == NULL) { return -1; } + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); code = tDecodeSStreamTask(&decoder, pTask); @@ -1080,14 +1085,14 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg tDecoderClear(&decoder); // 2.save task - code = streamMetaAddTask(pTq->pStreamMeta, version, pTask); + code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { return -1; } // 3.go through recover steps to fill history if (pTask->fillHistory) { - streamTaskCheckDownstream(pTask, version); + streamTaskCheckDownstream(pTask, sversion); } return 0; @@ -1158,7 +1163,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t code; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); @@ -1167,7 +1172,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m } // do recovery step 2 - code = streamSourceRecoverScanStep2(pTask, version); + code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; @@ -1215,7 +1220,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { SStreamRecoverFinishReq req; SDecoder decoder; - tDecoderInit(&decoder, msg, msgLen); + tDecoderInit(&decoder, (uint8_t*) msg, msgLen); tDecodeSStreamRecoverFinishReq(&decoder, &req); tDecoderClear(&decoder); @@ -1369,7 +1374,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { @@ -1453,7 +1461,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); return 0; @@ -1465,7 +1473,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamRetrieveReq req; SDecoder decoder; - tDecoderInit(&decoder, msgBody, msgLen); + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; @@ -1498,7 +1506,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { SStreamDispatchReq req; SDecoder decoder; - tDecoderInit(&decoder, msgBody, msgLen); + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); @@ -1561,4 +1569,4 @@ FAIL: return -1; } -int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; } +int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }