diff --git a/include/client/taos.h b/include/client/taos.h index 5ea1510e444a37c792fa3b0d69d5c090ca14c550..3cc2d907ab5ca18d16a9553d336672d67e4f974c 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); -DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index aa234422913fe6d8d40694c02bc8dab388afcf51..7d12c2a1d6186171e1bf7c43d342e013970ebd86 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,7 +312,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3576df434be5583d901fae079c25f41680dd4d3f..fa2e250b2bc9c0a6134069bbb0826175d4ca61a1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); - - return TSDB_CODE_SUCCESS; + return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); } static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { @@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); return NULL; } @@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us end: taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); + if(pCommitFp != NULL) { + pCommitFp(tmq, code, userParam); + } return; } @@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { if (tmq == NULL) { tscError("invalid tmq handle, null"); + if(cb != NULL) { + cb(tmq, TSDB_CODE_INVALID_PARA, param); + } return; } if (pRes == NULL) { // here needs to commit all offsets. @@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); return code; } -int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ +void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ + int32_t code = 0; if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto end; } int32_t accId = tmq->pTscObj->acctId; @@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; - int32_t code = getClientVg(tmq, tname, vgId, &pVg); + code = getClientVg(tmq, tname, vgId, &pVg); if(code != 0){ taosWUnLockLatch(&tmq->lock); - return code; + goto end; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; code = checkWalRange(pOffsetInfo, offset); if (code != 0) { taosWUnLockLatch(&tmq->lock); - return code; + goto end; } taosWUnLockLatch(&tmq->lock); @@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); - return code; +end: + if(code != 0 && cb != NULL){ + cb(tmq, code, param); + } } void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { @@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } + tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position); return position; } @@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){ if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){ committed = pOffsetInfo->committedOffset.version; taosWUnLockLatch(&tmq->lock); - return committed; + goto end; } SEpSet epSet = pVg->epSet; taosWUnLockLatch(&tmq->lock); - return getCommittedFromServer(tmq, tname, vgId, &epSet); + committed = getCommittedFromServer(tmq, tname, vgId, &epSet); + +end: + tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); + return committed; } int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, @@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - code = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_TMQ_INVALID_TOPIC; goto end; } @@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; @@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { return 0; } +// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); @@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, - tmq->consumerId, tname, vgId, tmq->epoch); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pParam->sem); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f23201a0621a4e3d4a0c1db45a2d2617228be210..6081b9a530852932d0ce33dee0a56369083b8e41 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mDebug("tq timer, rebalance counter old val:%d", old); + mInfo("tq timer, rebalance counter old val:%d", old); return old == 0; } @@ -116,7 +116,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mDebug("rebalance trans end, rebalance counter:%d", newVal); + mInfo("rebalance trans end, rebalance counter:%d", newVal); break; } } @@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { // rebalance cannot be parallel if (!mndRebTryStart()) { - mDebug("mq rebalance already in progress, do nothing"); + mInfo("mq rebalance already in progress, do nothing"); return 0; } @@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); @@ -362,7 +362,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { - mInfo("mq rebalance will be triggered"); + mInfo("mq rebalance will be triggered"); SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_DO_REBALANCE, .pCont = pRebMsg, @@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { for(int i = 0; i < taosArrayGetSize(req.topics); i++){ TopicOffsetRows* data = taosArrayGet(req.topics, i); - mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); + mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ @@ -1109,13 +1109,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * } if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { - mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId); + mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId); sdbRelease(pSdb, pConsumer); continue; } taosRLockLatch(&pConsumer->lock); - mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId); + mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); bool hasTopic = true; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f51a61eda38dc3d1d44d4b6ac251dcbf106f77eb..6bd23c3b902494d9b5f791bf3542401dc34caf52 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock int32_t numOfRows = 0; SMqSubscribeObj *pSub = NULL; - mDebug("mnd show subscriptions begin"); + mInfo("mnd show subscriptions begin"); while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); @@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock sdbRelease(pSdb, pSub); } - mDebug("mnd end show subscriptions"); + mInfo("mnd end show subscriptions"); pShow->numOfRows += numOfRows; return numOfRows; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8c9eead414eba3aa1d1371eb78e944b9a32675c6..89ed3ca1c7b8557fe1905fb32874a92632d54c65 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); + tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; taosWLockLatch(&pTq->lock); @@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return -1; } - tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, + tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index a839d6cbd8cf8f20a04462fa535241c8e1371c81..7ff7fe748e83429d6381ad00375a444cc3d465bb 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t fetchVer = pReader->curVersion; int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); - int64_t appliedVer = walGetAppliedVer(pReader->pWal); +// int64_t appliedVer = walGetAppliedVer(pReader->pWal); - if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); - } +// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] +// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// } - int64_t endVer = TMIN(appliedVer, committedVer); +// int64_t endVer = TMIN(appliedVer, committedVer); + int64_t endVer = committedVer; wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64", end index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + ", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer); if (fetchVer > endVer){ terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; @@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { pRead->pWal->vers.appliedVer); // TODO: valid ver - if (ver > pRead->pWal->vers.appliedVer) { - return -1; - } +// if (ver > pRead->pWal->vers.appliedVer) { +// return -1; +// } if (pRead->curVersion != ver) { code = walReaderSeekVer(pRead, ver);