From 94f07836d605d8d51b9c473ae3a5de2bbb4cabd4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 30 Jun 2022 17:25:19 +0800 Subject: [PATCH] fix(tmq): set scan rsp offset correctly --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/tq/tq.c | 1 - source/dnode/vnode/src/tq/tqExec.c | 11 ++++---- source/libs/executor/inc/executorimpl.h | 7 +---- source/libs/executor/src/executorMain.c | 8 +++++- source/libs/executor/src/executorimpl.c | 36 ++++++++++++++++++------- source/libs/executor/src/scanoperator.c | 29 -------------------- tests/script/tsim/tmq/snapshot1.sim | 2 ++ 8 files changed, 44 insertions(+), 52 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d3e3c58622..45fa94b3bf 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -172,7 +172,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le */ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); -int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0563b67f4a..c0d184ee11 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -389,7 +389,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // 4. send rsp if (dataRsp.blockNum != 0) { - tqOffsetResetToData(&dataRsp.rspOffset, 0, 0); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 09cbed7392..9dd2a0258f 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -63,10 +63,11 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->execCol.task[workerId]; - if (qStreamScanSnapshot(task) < 0) { - ASSERT(0); - } - if (qPrepareScan(task, offset.uid, offset.ts) < 0) { + /*if (qStreamScanSnapshot(task) < 0) {*/ + /*ASSERT(0);*/ + /*}*/ + + if (qStreamPrepareScan(task, offset.uid, offset.ts) < 0) { ASSERT(0); } @@ -86,7 +87,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S if (pRsp->withTbName) { pRsp->withTbName = 0; -#if 1 +#if 0 int64_t uid; int64_t ts; if (qGetStreamScanStatus(task, &uid, &ts) < 0) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 618cf50b86..9872f26b03 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -250,7 +250,7 @@ typedef struct SSampleExecInfo { enum { TABLE_SCAN__TABLE_ORDER = 1, - TABLE_SCAN__BLOCK_ORDER = 1, + TABLE_SCAN__BLOCK_ORDER = 2, }; typedef struct STableScanInfo { @@ -286,11 +286,6 @@ typedef struct STableScanInfo { int64_t ts; } lastStatus; - struct { - uint64_t uid; - int64_t ts; - } expStatus; - int8_t scanMode; } STableScanInfo; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index deb4f970df..8dd8005225 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -231,9 +231,15 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le return decodeOperator(pTaskInfo->pRoot, pInput, len); } -int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (uid == 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } + return doPrepareScan(pTaskInfo->pRoot, uid, ts); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6fc1d2b0b4..dd48229849 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2825,18 +2825,36 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamBlockScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; - /**uid = pSnapShotScanInfo->scanStatus.uid;*/ - /**ts = pSnapShotScanInfo->scanStatus.t;*/ - if (pSnapShotScanInfo->lastStatus.uid != uid || pSnapShotScanInfo->lastStatus.ts != ts) { - // rebuild scan - // + pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; + + STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; + + /*if (pSnapShotScanInfo->dataReader == NULL) {*/ + /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ + /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ + /*}*/ + + if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { + tsdbSetTableId(pInfo->dataReader, uid); + SQueryTableDataCond tmpCond = pInfo->cond; + tmpCond.twindows[0] = (STimeWindow){ + .skey = ts, + .ekey = INT64_MAX, + }; + tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); + pInfo->scanTimes = 0; + pInfo->curTWinIdx = 0; } + } else { - if (pOperator->pDownstream[0] == NULL) { - return TSDB_CODE_INVALID_PARA; + if (pOperator->numOfDownstream == 1) { + return doPrepareScan(pOperator->pDownstream[0], uid, ts); + } else if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block"); + return TSDB_CODE_QRY_APP_ERROR; } else { - doPrepareScan(pOperator->pDownstream[0], uid, ts); + qError("join not supported for stream block scan"); + return TSDB_CODE_QRY_APP_ERROR; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 10bd0520ad..a6bf9a12f3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -519,35 +519,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { // check status - if (pInfo->lastStatus.uid == pInfo->expStatus.uid && pInfo->lastStatus.ts == pInfo->expStatus.ts) { - while (1) { - SSDataBlock* result = doTableScanGroup(pOperator); - if (result) { - return result; - } - // if no data, switch to next table and continue scan - pInfo->currentTable++; - if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) { - return NULL; - } - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); - /*pTableInfo->uid */ - tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); - tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); - pInfo->scanTimes = 0; - pInfo->curTWinIdx = 0; - } - } - // reset to exp table and window start from ts - tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid); - SQueryTableDataCond tmpCond = pInfo->cond; - tmpCond.twindows[0] = (STimeWindow){ - .skey = pInfo->expStatus.ts, - .ekey = INT64_MAX, - }; - tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); - pInfo->scanTimes = 0; - pInfo->curTWinIdx = 0; while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim index 8c4a719006..9226c475cb 100644 --- a/tests/script/tsim/tmq/snapshot1.sim +++ b/tests/script/tsim/tmq/snapshot1.sim @@ -133,6 +133,8 @@ endi $totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb $sumOfRows = $data[0][3] + $data[1][3] if $sumOfRows != $totalMsgCons then + print actual: $sumOfRows + print expect: $totalMsgCons return -1 endi -- GitLab