diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60463bad5f910488320e259a76e68ab35417a70f..656ffda0cad2140ee0a6ccdfe048caea0801763b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1145,13 +1145,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start + // position, let's find it from the beginning. + index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { pScanInfo->currentTable = index; } else { - qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id); + qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, + numOfTables, pScanInfo->currentTable, id); return -1; } @@ -1160,25 +1163,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's start from the next ts that returned to consumer. pScanBaseInfo->cond.twindows.skey = ts + 1; + pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, - pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; return -1; } - qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - pScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c13366c5609e44cc9de824edc973e9c6d9722a9e..84317c825b0e1adbc8aa62103ef3e3780c083848 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -779,13 +779,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; if (pInfo->currentTable >= numOfTables) { + qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); return NULL; } STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, - pInfo->currentTable, numOfTables, pTaskInfo->id.str); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 0fa9bcfbd461c0498d301e7b702a2ee3f8dd8aa4..c47c218891a5d0f434e3795e7137d625e224f1c2 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -13,11 +13,11 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - #rpcDebugFlagVal = '143' + rpcDebugFlagVal = '143' #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1): diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py index 87832ac0ef2169ad2895928ceb20121ae17ecba9..11fc7dbcc0587b20fd65bc71047ba04c1adb3f91 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -16,6 +16,8 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {"tsdbDebugFlag":135} + def __init__(self): self.vgroups = 4 self.ctbNum = 10