提交 ca1c961f 编写于 作者: L Liu Jicong

fix(tmq): set last scan status

上级 8dece648
......@@ -85,12 +85,15 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
pRsp->withTbName = 0;
#if 1
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
#endif
}
pRsp->blockNum++;
......
......@@ -392,6 +392,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
binfo.capacity = binfo.rows;
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info = binfo;
ASSERT(binfo.uid != 0);
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
......@@ -419,6 +420,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTableScanInfo->lastStatus.uid = pBlock->info.uid;
pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.uid != 0);
return pBlock;
}
return NULL;
......@@ -438,6 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
ASSERT(p->info.uid != 0);
return p;
}
pTableScanInfo->curTWinIdx += 1;
......@@ -517,6 +520,35 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
// check status
if (pInfo->lastStatus.uid == pInfo->expStatus.uid && pInfo->lastStatus.ts == pInfo->expStatus.ts) {
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
if (result) {
return result;
}
// if no data, switch to next table and continue scan
pInfo->currentTable++;
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
return NULL;
}
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
/*pTableInfo->uid */
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
}
}
// reset to exp table and window start from ts
tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid);
SQueryTableDataCond tmpCond = pInfo->cond;
tmpCond.twindows[0] = (STimeWindow){
.skey = pInfo->expStatus.ts,
.ekey = INT64_MAX,
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
if (result) {
return result;
......@@ -532,23 +564,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
pInfo->lastStatus.ts = pInfo->expStatus.ts;
pInfo->lastStatus.uid = pInfo->expStatus.uid;
return doTableScan(pOperator);
}
// reset to exp table and window start from ts
tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid);
SQueryTableDataCond tmpCond = pInfo->cond;
tmpCond.twindows[0] = (STimeWindow){
.skey = pInfo->expStatus.ts,
.ekey = INT64_MAX,
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
pInfo->lastStatus.ts = pInfo->expStatus.ts;
pInfo->lastStatus.uid = pInfo->expStatus.uid;
return doTableScan(pOperator);
}
if (pInfo->currentGroupId == -1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册