From 4f107908db680d2d2436e926c93ac4b71711ed99 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 30 Jun 2022 19:02:43 +0800 Subject: [PATCH] fix(tmq): reset window --- source/client/src/tmq.c | 3 ++- source/common/src/tmsg.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 23 +++++++++-------------- source/libs/executor/src/executorimpl.c | 10 ++++------ source/libs/executor/src/scanoperator.c | 2 -- tests/script/tsim/tmq/snapshot.sim | 9 +++------ tests/script/tsim/tmq/snapshot1.sim | 6 +++--- 7 files changed, 23 insertions(+), 34 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index aa78649ba7..bea9d215da 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - /*printf("call poll\n");*/ + /*tscDebug("call poll");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { + /*tscDebug("call poll1");*/ void* rspObj; int64_t startTime = taosGetTimestampMs(); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 78eccdf0cd..80402439fc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5398,9 +5398,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { - snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); + snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); + snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c0d184ee11..d80996b399 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con }; tmsgSendRsp(&rsp); - char buf1[50]; - char buf2[50]; - tFormatOffset(buf1, 50, &pRsp->reqOffset); - tFormatOffset(buf2, 50, &pRsp->rspOffset); + char buf1[80]; + char buf2[80]; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); @@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffsetVal fetchOffsetNew; // 1.find handle - char buf[50]; - tFormatOffset(buf, 50, &reqOffset); + char buf[80]; + tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, TD_VID(pTq->pVnode), buf); @@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; metaRsp.reqOffset = pReq->reqOffset.version; - /*tqOffsetResetToLog(&metaR)*/ metaRsp.rspOffset = fetchVer; metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; @@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // 2. get data (rebuild reader if needed) // 3. get new uid and ts - char formatBuf[50]; - tFormatOffset(formatBuf, 50, &dataRsp.reqOffset); - tqInfo("retrieve using snapshot req offset %s", formatBuf); + tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { ASSERT(0); } // 4. send rsp - if (dataRsp.blockNum != 0) { - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; } } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { ASSERT(0); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index dd48229849..efa8887ed0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2836,12 +2836,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { tsdbSetTableId(pInfo->dataReader, uid); - SQueryTableDataCond tmpCond = pInfo->cond; - tmpCond.twindows[0] = (STimeWindow){ - .skey = ts, - .ekey = INT64_MAX, - }; - tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); + int64_t oldSkey = pInfo->cond.twindows[0].skey; + pInfo->cond.twindows[0].skey = ts; + tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + pInfo->cond.twindows[0].skey = oldSkey; pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6bf9a12f3..a3af4ab223 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - // check status while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { @@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); - /*pTableInfo->uid */ tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); pInfo->scanTimes = 0; diff --git a/tests/script/tsim/tmq/snapshot.sim b/tests/script/tsim/tmq/snapshot.sim index 5683aaa559..de0468e6f2 100644 --- a/tests/script/tsim/tmq/snapshot.sim +++ b/tests/script/tsim/tmq/snapshot.sim @@ -111,7 +111,7 @@ endi $consumerId = 0 $totalMsgOfStb = $ctbNum * $rowsPerCtb -$expectmsgcnt = 1 +$expectmsgcnt = 1000000 $expectrowcnt = 100 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) @@ -131,9 +131,6 @@ endi if $data[0][1] != $consumerId then return -1 endi -if $data[0][2] != $expectmsgcnt then - return -1 -endi if $data[0][3] != $expectrowcnt then return -1 endi @@ -183,7 +180,7 @@ endi $consumerId = 0 $totalMsgOfCtb = $rowsPerCtb -$expectmsgcnt = 1 +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ctb @@ -254,7 +251,7 @@ endi $consumerId = 0 $totalMsgOfNtb = $rowsPerCtb -$expectmsgcnt = $totalMsgOfNtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ntb diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim index 9226c475cb..d534bb68da 100644 --- a/tests/script/tsim/tmq/snapshot1.sim +++ b/tests/script/tsim/tmq/snapshot1.sim @@ -80,7 +80,7 @@ $topicList = $topicList . ' $consumerId = 0 $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfStb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) @@ -168,7 +168,7 @@ $consumerId = 0 $totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfCtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) $topicList = ' . topic_ctb_function @@ -245,7 +245,7 @@ $topicList = $topicList . ' $consumerId = 0 $totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfNtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) -- GitLab