From a28156e832d4298a15806fd7d6019ad0076e42a5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Feb 2023 12:35:50 +0800 Subject: [PATCH] refactor: do some internal refactor and add some logs for tmq. --- source/libs/executor/src/executor.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b3d3e8aab7..9611cdab67 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1048,15 +1048,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.returned = 0; + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { return 0; } if (subType == TOPIC_SUB_TYPE__COLUMN) { - uint16_t type = pOperator->operatorType; pOperator->status = OP_OPENED; + // TODO add more check - if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { ASSERT(pOperator->numOfDownstream == 1); pOperator = pOperator->pDownstream[0]; } @@ -1070,7 +1071,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one + // those data are from the snapshot in tsdb, besides the data in the wal file. int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; @@ -1129,7 +1131,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, pTableScanInfo->currentTable, numOfTables); - /*}*/ } else { ASSERT(0); } @@ -1172,7 +1173,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; - qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -1180,7 +1181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); return -1; } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); -- GitLab