From 96b93266e5a803d1fe1afad4330443a765408105 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 4 Aug 2023 15:34:25 +0800 Subject: [PATCH] fix:[TS-3756] assignment is same even though there are data --- source/client/src/clientTmq.c | 4 ++-- source/client/test/clientTests.cpp | 37 ++++++++++++++++++++++-------- source/dnode/vnode/src/tq/tqUtil.c | 15 +++++++----- source/libs/wal/src/walRead.c | 3 ++- 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5879de2e30..6c30577c74 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2119,7 +2119,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { - tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); + tscInfo("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); @@ -2878,7 +2878,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, + tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d", tmq->consumerId, pTopic->topicName, vgId, tmq->epoch); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); taosWUnLockLatch(&tmq->lock); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a2cda0dcf9..8a728642dc 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1088,8 +1088,8 @@ TEST(clientCase, td_25129) { tmq_conf_set(conf, "group.id", "group_id_2"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "auto.offset.reset", "earliest"); - tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "auto.offset.reset", "latest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); @@ -1107,7 +1107,7 @@ TEST(clientCase, td_25129) { int32_t precision = 0; int32_t totalRows = 0; int32_t msgCnt = 0; - int32_t timeout = 2000; + int32_t timeout = 200; int32_t count = 0; @@ -1177,7 +1177,10 @@ TEST(clientCase, td_25129) { printSubResults(pRes, &totalRows); - code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + tmq_topic_assignment* pAssignTmp = NULL; + int32_t numOfAssignTmp = 0; + + code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_free_assignment(pAssign); @@ -1188,12 +1191,28 @@ TEST(clientCase, td_25129) { } for(int i = 0; i < numOfAssign; i++){ - printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end); } + if(numOfAssign != 0){ + int i = 0; + for(; i < numOfAssign; i++){ + if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){ + break; + } + } + if(i == numOfAssign){ + printf("all position is same\n"); + break; + } + tmq_free_assignment(pAssign); + } + numOfAssign = numOfAssignTmp; + pAssign = pAssignTmp; + } else { - tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); - tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); - tmq_commit_sync(tmq, pRes); +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); +// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); +// tmq_commit_sync(tmq, pRes); continue; } @@ -1204,7 +1223,7 @@ TEST(clientCase, td_25129) { // break; // } } else { - break; +// break; } // tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8948bae852..2e128f144d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -157,9 +157,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ +static void setRequestVersion(STqOffsetVal* offset, int64_t verStart, int64_t verEnd){ if(offset->type == TMQ_OFFSET__LOG){ - offset->version = ver + 1; + offset->version = verStart + 1; + } + if(offset->version > verEnd){ + offset->version = verEnd; } } @@ -192,7 +195,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } taosWUnLockLatch(&pTq->lock); } - setRequestVersion(&dataRsp.reqOffset, pOffset->version); + setRequestVersion(&dataRsp.reqOffset, pOffset->version, dataRsp.rspOffset.version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); end : { @@ -267,7 +270,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - setRequestVersion(&taosxRsp.reqOffset, offset->version); + setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -280,7 +283,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); - setRequestVersion(&taosxRsp.reqOffset, offset->version); + setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -310,7 +313,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - setRequestVersion(&taosxRsp.reqOffset, offset->version); + setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } else { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 786f48ce88..8b3eb4f664 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -76,7 +76,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); } - int64_t endVer = TMIN(appliedVer, committedVer); + int64_t endVer = committedVer; +// int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", end index:%" PRId64, -- GitLab