From c3be7b14bed50e8f680bd98fcb381c405cce9659 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 May 2023 17:51:03 +0800 Subject: [PATCH] fix: set correct function ptr. --- include/libs/executor/executor.h | 1 - include/libs/executor/storageapi.h | 42 +++++++++++----------- source/dnode/snode/inc/sndInt.h | 2 ++ source/dnode/snode/src/snode.c | 4 +-- source/dnode/vnode/inc/vnode.h | 4 ++- source/dnode/vnode/src/meta/metaQuery.c | 4 +-- source/dnode/vnode/src/tq/tqRead.c | 3 ++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 12 +++++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 8 +++++ source/libs/executor/src/executil.c | 6 ++-- source/libs/executor/src/executor.c | 15 ++++---- source/libs/executor/src/operator.c | 4 +-- source/libs/executor/src/querytask.c | 2 +- source/libs/executor/src/scanoperator.c | 42 +++++++++++----------- source/libs/executor/src/sysscanoperator.c | 20 +++++------ 15 files changed, 98 insertions(+), 71 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c4c2c89fdb..e90bb4688b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -42,7 +42,6 @@ typedef struct { typedef struct { void* tqReader; -// void* meta; void* config; void* vnode; void* mnd; diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 1240f536ae..5f7a91c057 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -84,7 +84,7 @@ typedef struct SMetaReader { SMetaEntry me; void * pBuf; int32_t szBuf; - struct SStorageAPI *storageAPI; + struct SStoreMeta* pAPI; } SMetaReader; typedef struct SMTbCursor { @@ -256,7 +256,7 @@ typedef struct SStoreCacheReader { void *(*closeReader)(void *pReader); int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUidList); - void (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); + int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); } SStoreCacheReader; /*------------------------------------------------------------------------------------------------------------------*/ @@ -290,6 +290,7 @@ typedef struct SStoreTqReader { int32_t (*tqRetrieveBlock)(); bool (*tqReaderNextBlockInWal)(); bool (*tqNextBlockImpl)(); // todo remove it + SSDataBlock* (*tqGetResultBlock)(); void (*tqReaderSetColIdList)(); int32_t (*tqReaderSetQueryTableList)(); @@ -345,14 +346,6 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); */ -typedef struct SStoreMetaReader { - void (*initReader)(SMetaReader *pReader, void *pMeta, int32_t flags); - void (*clearReader)(SMetaReader *pReader); - void (*readerReleaseLock)(SMetaReader *pReader); - int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid); - int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name); - int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); -} SStoreMetaReader; typedef struct SStoreMeta { SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor @@ -387,7 +380,7 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in void *(*storeGetIndexInfo)(); void *(*getInvertIndex)(void* pVnode); int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(); // vnodeGetStbIdList & vnodeGetAllTableList + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList void *storeGetVersionRange; void *storeGetLastTimestamp; @@ -405,9 +398,14 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); */ } SStoreMeta; - - - +typedef struct SStoreMetaReader { + void (*initReader)(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); + void (*clearReader)(SMetaReader *pReader); + void (*readerReleaseLock)(SMetaReader *pReader); + int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid); + int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name); + int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); +} SStoreMetaReader; typedef struct SUpdateInfo { SArray *pTsBuckets; @@ -507,14 +505,14 @@ typedef struct SStateStore { } SStateStore; typedef struct SStorageAPI { - SStoreMeta metaFn; // todo: refactor - TsdReader tsdReader; - SStoreMetaReader metaReaderFn; - SStoreCacheReader cacheFn; - SStoreSnapshotFn snapshotFn; - SStoreTqReader tqReaderFn; - SStateStore stateStore; - SMetaDataFilterAPI metaFilter; + SStoreMeta metaFn; // todo: refactor + TsdReader tsdReader; + SStoreMetaReader metaReaderFn; + SStoreCacheReader cacheFn; + SStoreSnapshotFn snapshotFn; + SStoreTqReader tqReaderFn; + SStateStore stateStore; + SMetaDataFilterAPI metaFilter; SFunctionStateStore functionStore; } SStorageAPI; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 3fcee862a1..68f7f756d5 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -53,6 +53,8 @@ int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); #endif +void initStreamStateAPI(SStorageAPI* pAPI); + #ifdef __cplusplus } #endif diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 24e2b13f46..678dd34e4a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -62,8 +62,7 @@ FAIL: } int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { - ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); - ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); + ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -88,6 +87,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo); SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; + initStreamStateAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2723e869ff..e8ae545ea0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,6 +69,7 @@ int64_t vnodeGetSyncHandle(SVnode *pVnode); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); +int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); ESyncRole vnodeGetRole(SVnode *pVnode); @@ -105,7 +106,7 @@ typedef struct SMetaEntry SMetaEntry; #define META_READER_NOLOCK 0x1 -void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags); +void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); @@ -257,6 +258,7 @@ int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); bool tqNextBlockInWal(STqReader *pReader, const char *idstr); bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader* tqGetWalReader(STqReader* pReader); +SSDataBlock* tqGetResultBlock (STqReader* pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 255fa10f57..7eb6460af5 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -17,14 +17,14 @@ #include "osMemory.h" #include "tencode.h" -void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags) { +void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) { SMeta* pMeta = ((SVnode*)pVnode)->pMeta; metaReaderInit(pReader, pMeta, flags); + pReader->pAPI = pAPI; } void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { memset(pReader, 0, sizeof(*pReader)); - pReader->flags = flags; pReader->pMeta = pMeta; if (pReader->pMeta && !(flags & META_READER_NOLOCK)) { metaRLock(pMeta); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2521d956a3..dbb7d564f8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -446,6 +446,9 @@ SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; } +SSDataBlock* tqGetResultBlock (STqReader* pReader) { + return pReader->pResBlock; +} bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { if (pReader->msg.msgStr == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 923e25f73b..5317c0e675 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -25,6 +25,7 @@ static void initStateStoreAPI(SStateStore* pStore); static void initMetaReaderAPI(SStoreMetaReader* pMetaReader); static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter); static void initFunctionStateStore(SFunctionStateStore* pStore); +static void initCacheFn(SStoreCacheReader* pCache); void initStorageAPI(SStorageAPI* pAPI) { initTsdbReaderAPI(&pAPI->tsdReader); @@ -34,6 +35,7 @@ void initStorageAPI(SStorageAPI* pAPI) { initMetaReaderAPI(&pAPI->metaReaderFn); initMetaFilterAPI(&pAPI->metaFilter); initFunctionStateStore(&pAPI->functionStore); + initCacheFn(&pAPI->cacheFn); } void initTsdbReaderAPI(TsdReader* pReader) { @@ -83,6 +85,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableNameByUid = metaGetTableNameByUid; pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor + pMeta->storeGetTableList = vnodeGetTableList; } void initTqAPI(SStoreTqReader* pTq) { @@ -109,6 +112,8 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it + pTq->tqGetResultBlock = tqGetResultBlock; + pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; } @@ -213,4 +218,11 @@ void initMetaFilterAPI(SMetaDataFilterAPI* pFilter) { void initFunctionStateStore(SFunctionStateStore* pStore) { pStore->streamStateFuncPut = streamStateFuncPut; pStore->streamStateFuncGet = streamStateFuncGet; +} + +void initCacheFn(SStoreCacheReader* pCache) { + pCache->openReader = tsdbCacherowsReaderOpen; + pCache->closeReader = tsdbCacherowsReaderClose; + pCache->retrieveRows = tsdbRetrieveCacheRows; + pCache->reuseReader = tsdbReuseCacherowsReader; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index ac3e632172..022fc4c951 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -431,6 +431,14 @@ void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* num } } +int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList) { + if (type == TSDB_SUPER_TABLE) { + return vnodeGetStbIdList(pVnode, 0, pList); + } else { + return TSDB_CODE_INVALID_PARA; + } +} + int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index ebe3d69dbb..562a6daf8a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -263,7 +263,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { STagVal tagVal = {0}; tagVal.cid = pSColumnNode->colId; - const char* p = mr->storageAPI->metaFn.extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal); + const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal); if (p == NULL) { res->node.resType.type = TSDB_DATA_TYPE_NULL; } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) { @@ -306,7 +306,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, int32_t code = TSDB_CODE_SUCCESS; SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, metaHandle, 0); + pAPI->metaReaderFn.initReader(&mr, metaHandle, 0, &pAPI->metaFn); code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid); if (TSDB_CODE_SUCCESS != code) { pAPI->metaReaderFn.clearReader(&mr); @@ -1168,7 +1168,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, SStorageAPI* pAPI) { SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pVnode, 0); + pAPI->metaReaderFn.initReader(&mr, pVnode, 0, &pAPI->metaFn); if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) { // table not exist pAPI->metaReaderFn.clearReader(&mr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 53fad6503e..14f5c3f4fa 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -341,7 +341,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn); for (int32_t i = 0; i < numOfUids; ++i) { uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i); @@ -1091,12 +1091,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - ASSERT(0); -// walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset); -// if (tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { -// qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); -// return -1; -// } + SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn; + SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); + walReaderVerifyOffset(pWalReader, pOffset); + if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { + qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); + return -1; + } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one // those data are from the snapshot in tsdb, besides the data in the wal file. diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 8521fb623e..935e6a0b8b 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -381,9 +381,9 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); - int32_t code = pTaskInfo->storageAPI.metaFn.storeGetTableList(pHandle->vnode, pBlockNode->uid, pList); + int32_t code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = terrno; + pTaskInfo->code = code; return NULL; } diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index f79966a021..22d171e74a 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -122,7 +122,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo SStorageAPI* pAPI = &pTaskInfo->storageAPI; - pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0); + pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 66e200c81b..8f11cd769a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -531,7 +531,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed. @@ -560,7 +560,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -1330,9 +1330,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - int64_t version = pSrcBlock->info.version - 1; + int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version); + uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver); // gap must be 0. SSessionKey startWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin); @@ -1378,13 +1378,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; - int64_t version = pSrcBlock->info.version - 1; + int64_t ver = pSrcBlock->info.version - 1; if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) { uint64_t srcUid = srcUidData[0]; TSKEY startTs = srcStartTsCol[0]; TSKEY endTs = srcEndTsCol[0]; - SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version); + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver); printDataBlock(pPreRes, "pre res"); blockDataCleanup(pSrcBlock); int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows); @@ -1422,7 +1422,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; if (groupId == 0) { - groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version); + groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver); } TSKEY calStartTs = srcStartTsCol[i]; colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false); @@ -1459,13 +1459,13 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; - int64_t version = pSrcBlock->info.version - 1; + int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; if (groupId == 0) { - groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version); + groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver); } if (pInfo->tbnameCalSup.pExprInfo) { void* parTbname = NULL; @@ -1676,7 +1676,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); - SSDataBlock* pRes = NULL; + SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); // curVersion move to next, so currentOffset = curVersion - 1 @@ -2558,7 +2558,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); @@ -3185,6 +3185,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo)); return code; } + supp->dbNameSlotId = -1; supp->stbNameSlotId = -1; supp->tbCountSlotId = -1; @@ -3194,6 +3195,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo)); return code; } + code = tblCountScanGetCountSlotId(pseudoCols, supp); if (code != TSDB_CODE_SUCCESS) { qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo)); @@ -3378,7 +3380,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca if (pSupp->groupByStbName) { if (pInfo->stbUidList == NULL) { pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t)); - if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, 0, pInfo->stbUidList, TSDB_SUPER_TABLE) < 0) { + if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList) < 0) { qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr()); } } @@ -3412,7 +3414,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO if (strlen(pSupp->dbNameFilter) != 0) { if (strlen(pSupp->stbNameFilter) != 0) { - tb_uid_t uid = 0; + uint64_t uid = 0; pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid); int64_t numOfChildTables = 0; @@ -3420,7 +3422,8 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes); } else { - int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode); + int64_t tbNumVnode = 0; + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); } } else { @@ -3444,6 +3447,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode); pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables); + if (numOfTables != 0) { fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes); } @@ -3456,18 +3460,16 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S char fullStbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pSupp->groupByDbName) { - snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName); + snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, varDataVal(stbName)); } else { - snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName); + snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", varDataVal(stbName)); } uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName)); pRes->info.id.groupId = groupId; - SMetaStbStats stats = {0}; -// metaGetStbStats(pInfo->readHandle.vnode, stbUid, &stats); - int64_t ctbNum = stats.ctbNum; - + int64_t ctbNum = 0; + int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum); fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 40b45c25cd..fd766e21f8 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -466,7 +466,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->req.filterTb); SMetaReader smrTable = {0}; - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -486,7 +486,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { if (smrTable.me.type == TSDB_CHILD_TABLE) { int64_t suid = smrTable.me.ctbEntry.suid; pAPI->metaReaderFn.clearReader(&smrTable); - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -569,7 +569,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { schemaRow = *(SSchemaWrapper**)schema; } else { SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -658,7 +658,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, condTableName); SMetaReader smrChildTable = {0}; - pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrChildTable, condTableName); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -676,7 +676,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByUid @@ -715,7 +715,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1131,7 +1131,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { tb_uid_t* uid = taosArrayGet(pIdx->uids, i); SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn); ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid); if (ret < 0) { pAPI->metaReaderFn.clearReader(&mr); @@ -1159,7 +1159,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); SMetaReader mr1 = {0}; - pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); int64_t suid = mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr1, suid); @@ -1338,7 +1338,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid); @@ -2148,7 +2148,7 @@ static int32_t doGetTableRowSize(SReadHandle *pHandle, uint64_t uid, int32_t* ro *rowLen = 0; SMetaReader mr = {0}; - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); int32_t code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr); -- GitLab