From a9b18d203daa2299aaa546723d86606f9f117ccc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 24 May 2023 23:49:26 +0800 Subject: [PATCH] fix:fix error. --- include/libs/executor/dataSinkMgt.h | 2 +- source/dnode/vnode/inc/vnode.h | 4 ++-- source/dnode/vnode/src/meta/metaQuery.c | 12 ++++++------ source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++--- source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 ++ source/libs/executor/inc/dataSinkInt.h | 4 ++-- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/dataInserter.c | 2 +- source/libs/executor/src/dataSinkMgt.c | 3 ++- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/scanoperator.c | 9 +++++---- 11 files changed, 26 insertions(+), 22 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index ce7d038d42..0a9037d21c 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg { uint32_t maxDataBlockNumPerQuery; } SDataSinkMgtCfg; -int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg); +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI); typedef struct SInputData { const struct SSDataBlock* pData; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 4e3e8a6a8d..2723e869ff 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -118,7 +118,7 @@ const void *metaGetTableTagVal(const void *tag, int16_t type, STagVal *tagVal); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); -int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); +int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); bool metaIsTableExist(void* pVnode, tb_uid_t uid); int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, @@ -192,7 +192,7 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); -int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); +int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 617a3a2c08..255fa10f57 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -139,10 +139,10 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { return uid; } -int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) { +int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, (SMeta *)meta, 0); + metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -170,10 +170,10 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { return 0; } -int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) { +int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, (SMeta *)meta, 0); + metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0); SMetaReader *pReader = &mr; @@ -191,10 +191,10 @@ int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) { return 0; } -int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) { +int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, (SMeta *)meta, 0); + metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); code = metaGetTableEntryByName(&mr, tbName); if (code == 0) *tbType = mr.me.type; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 414ccfc8b2..62b121e4d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5373,9 +5373,9 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { return rows; } -int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { +int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); + metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -5405,7 +5405,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 metaReaderClear(&mr); // get the newest table schema version - code = metaGetTbTSchemaEx(pVnode->pMeta, *suid, uid, -1, pSchema); + code = metaGetTbTSchemaEx(((SVnode*)pVnode)->pMeta, *suid, uid, -1, pSchema); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 34c6da1398..70cff5e68a 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -78,6 +78,8 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableUidByName = metaGetTableUidByName; pMeta->getTableTypeByName = metaGetTableTypeByName; pMeta->getTableNameByUid = metaGetTableNameByUid; + + pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor } void initTqAPI(SStoreTqReader* pTq) { diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 3255831476..9893b4eb76 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -29,8 +29,8 @@ struct SDataSink; struct SDataSinkHandle; typedef struct SDataSinkManager { - SDataSinkMgtCfg cfg; - SStorageAPI storeFn; + SDataSinkMgtCfg cfg; + SStorageAPI* pAPI; } SDataSinkManager; typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index eb585e7046..8188ecd305 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -224,7 +224,7 @@ typedef struct STableScanInfo { int8_t assignBlockUid; bool hasGroupByTag; bool countOnly; - TsdReader readerAPI; +// TsdReader readerAPI; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index aadbf1f1a9..646964ebf4 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -429,7 +429,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->explain = pInserterNode->explain; int64_t suid = 0; - int32_t code = pManager->storeFn.metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); + int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); if (code) { destroyDataSinker((SDataSinkHandle*)inserter); taosMemoryFree(inserter); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index b3cb57325b..3a972c1c20 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -21,8 +21,9 @@ static SDataSinkManager gDataSinkManager = {0}; SDataSinkStat gDataSinkStat = {0}; -int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg) { +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) { gDataSinkManager.cfg = *cfg; + gDataSinkManager.pAPI = pAPI; return 0; // to avoid compiler eror } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 78a015269a..53fad6503e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -507,7 +507,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; - code = dsDataSinkMgtInit(&cfg); + code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI); if (code != TSDB_CODE_SUCCESS) { qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 83e62faf90..4bd8faf8e9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -929,7 +929,7 @@ static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); taosHashCleanup(pTableScanInfo->pIgnoreTables); - destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->readerAPI); + destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); taosMemoryFreeClear(param); } @@ -975,7 +975,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); - pInfo->readerAPI = pTaskInfo->storageAPI.tsdReader; + pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); // blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); @@ -1075,7 +1075,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6 pTableScanInfo->base.cond.endVersion = version; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; - pTableScanInfo->readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; } @@ -1250,7 +1250,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 *pRowIndex = 0; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pTableScanInfo->readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; return NULL; } @@ -3045,6 +3045,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } + pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.readHandle = *readHandle; -- GitLab