From e1d85ce0747f465d79f49ffd07ddee89de854f96 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Mar 2023 12:04:47 +0800 Subject: [PATCH] fix(tmq): fix error. --- source/libs/executor/src/executor.c | 18 ++++++++++-------- source/libs/executor/src/scanoperator.c | 3 ++- tests/system-test/7-tmq/subscribeDb.py | 4 ++-- .../7-tmq/tmqConsFromTsdb-mutilVg.py | 2 ++ 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60463bad5f..656ffda0ca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1145,13 +1145,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start + // position, let's find it from the beginning. + index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { pScanInfo->currentTable = index; } else { - qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id); + qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, + numOfTables, pScanInfo->currentTable, id); return -1; } @@ -1160,25 +1163,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's start from the next ts that returned to consumer. pScanBaseInfo->cond.twindows.skey = ts + 1; + pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, - pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; return -1; } - qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - pScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c13366c560..84317c825b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -779,13 +779,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; if (pInfo->currentTable >= numOfTables) { + qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); return NULL; } STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, - pInfo->currentTable, numOfTables, pTaskInfo->id.str); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 0fa9bcfbd4..c47c218891 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -13,11 +13,11 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - #rpcDebugFlagVal = '143' + rpcDebugFlagVal = '143' #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1): diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py index 87832ac0ef..11fc7dbcc0 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -16,6 +16,8 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {"tsdbDebugFlag":135} + def __init__(self): self.vgroups = 4 self.ctbNum = 10 -- GitLab