diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 398a09ecbc33dbd81efc344539159fc001c9b308..d46f12cf77577e1412fe0c6242da70c624467030 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) { .reader = handle.execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, }; handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); ASSERT(handle.execHandle.execCol.task[i]); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d83f482ca83670118ce9ef57a02efb554570b69e..50732b8e0d81ac3be2810c667a2866e9ccb76fa2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -154,6 +154,7 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -373,7 +374,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd initReaderStatus(&pReader->status); - pReader->pTsdb = pVnode->pTsdb; + pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr); pReader->suid = pCond->suid; pReader->order = pCond->order; pReader->capacity = 4096; @@ -2375,6 +2376,43 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } +STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr) { + if (VND_IS_RSMA(pVnode)) { + int level = 0; + int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); + + for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { + SRetention* pRetention = retentions + level; + if (pRetention->keep <= 0) { + if (level > 0) { + --level; + } + break; + } + if ((now - pRetention->keep) <= winSKey) { + break; + } + ++level; + } + + int32_t vgId = TD_VID(pVnode); + const char* str = (idStr != NULL)? idStr:""; + + if (level == TSDB_RETENTION_L0) { + tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str); + return VND_RSMA0(pVnode); + } else if (level == TSDB_RETENTION_L1) { + tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str); + return VND_RSMA1(pVnode); + } else { + tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str); + return VND_RSMA2(pVnode); + } + } + + return VND_TSDB(pVnode); +} + // // todo not unref yet, since it is not support multi-group interpolation query // static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { // // filter the queried time stamp in the first place @@ -3280,8 +3318,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_ } } - ASSERT(0); - tsdbDebug("%p reset tsdbreader in query %s", pReader, numOfTables, pReader->idStr); + tsdbDebug("%p reset reader, suid:%"PRIu64", numOfTables:%d, query range:%"PRId64" - %"PRId64" in query %s", pReader, pReader->suid, + numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); return code; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 342638d12add12b20e8e9d6b4d079325adbbc2a7..6122c7715427f752957493f53dd75eba1c26cb61 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -305,9 +305,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond; if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pTagIndexCond) { - SIndexMetaArg metaArg = { - .metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid}; - SArray* res = taosArrayInit(8, sizeof(uint64_t)); // code = doFilterTag(pTagIndexCond, &metaArg, res); code = TSDB_CODE_INDEX_REBUILDING; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 007c61afc3ab37fc416593e1cd519189a98bd3cd..fc9b480233969133d4e94d6a81b78ffc63457a80 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -138,7 +138,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); - int64_t st = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); uint64_t el = (taosGetTimestampUs() - st);