From 5bd3732adfcf6a783fe1b3dfee24058abd8a3bff Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 16 Jun 2022 09:53:07 +0000 Subject: [PATCH] refact more code --- source/dnode/vnode/inc/vnode.h | 20 +++---- source/dnode/vnode/src/inc/vnodeInt.h | 30 +++++----- source/dnode/vnode/src/tsdb/tsdbRead.c | 76 +++++++++---------------- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 17 +++--- source/libs/executor/src/scanoperator.c | 5 +- 6 files changed, 63 insertions(+), 87 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7290c50bca..a797d6981c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,16 +116,16 @@ typedef struct STsdbReader STsdbReader; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, - uint64_t taskId); -bool tsdbNextDataBlock(STsdbReader *pReader); -void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); -int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); -SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); -void tsdbCleanupReadHandle(STsdbReader *pReader); -int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, + uint64_t taskId, STsdbReader **ppReader); +bool tsdbNextDataBlock(STsdbReader *pReader); +void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); +int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); +SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); +void tsdbCleanupReadHandle(STsdbReader *pReader); +int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); +int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 75d7d3d444..06cde5ac00 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -110,22 +110,20 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); -int tsdbClose(STsdb** pTsdb); -int32_t tsdbBegin(STsdb* pTsdb); -int32_t tsdbCommit(STsdb* pTsdb); -int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); -int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, - SSubmitBlkRsp* pRsp); -int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - uint64_t taskId); -STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - void* pMemRef); -int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); -int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); -int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); +int tsdbClose(STsdb** pTsdb); +int32_t tsdbBegin(STsdb* pTsdb); +int32_t tsdbCommit(STsdb* pTsdb); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); +int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, + SSubmitBlkRsp* pRsp); +int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); +STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + void* pMemRef); +int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); +int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); +int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); // tq int tqInit(); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b398d7dbbd..644dd21e6e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -141,23 +141,6 @@ struct STsdbReader { STSchema* pSchema; }; -static STimeWindow updateLastrowForEachGroup(STableListInfo* pList); -static int32_t checkForCachedLastRow(STsdbReader* pTsdbReadHandle, STableListInfo* pList); -static int32_t checkForCachedLast(STsdbReader* pTsdbReadHandle); -// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); - -static void changeQueryHandleForInterpQuery(STsdbReader* pHandle); -static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); -static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, - STsdbReader* pTsdbReadHandle); -static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); -// static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, void* pMemRef); -// static void* doFreeColumnInfoData(SArray* pColumnInfoData); -// static void* destroyTableCheckInfo(SArray* pTableCheckInfo); -static bool tsdbGetExternalRow(STsdbReader* pHandle); - -static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions); - static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; pBlockLoadInfo->uid = 0; @@ -575,33 +558,33 @@ void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCo // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -STsdbReader* tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, - uint64_t taskId) { - pCond->twindows[0] = updateLastrowForEachGroup(pList); +// STsdbReader* tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, +// uint64_t taskId) { +// pCond->twindows[0] = updateLastrowForEachGroup(pList); - // no qualified table - if (taosArrayGetSize(pList->pTableList) == 0) { - return NULL; - } +// // no qualified table +// if (taosArrayGetSize(pList->pTableList) == 0) { +// return NULL; +// } - STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); - if (pTsdbReadHandle == NULL) { - return NULL; - } +// STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); +// if (pTsdbReadHandle == NULL) { +// return NULL; +// } - int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList); - if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 - terrno = code; - return NULL; - } +// int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList); +// if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 +// terrno = code; +// return NULL; +// } - assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey); - if (pTsdbReadHandle->cachelastrow) { - pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; - } +// assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey); +// if (pTsdbReadHandle->cachelastrow) { +// pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; +// } - return pTsdbReadHandle; -} +// return pTsdbReadHandle; +// } #if 0 STsdbReader * tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { @@ -1226,13 +1209,6 @@ _error: return terrno; } -static int32_t getEndPosInDataBlock(STsdbReader* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); -static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, - int32_t end); -static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle); -static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, - SDataBlockInfo* pBlockInfo, int32_t endPos); - static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { SQueryFilePos* cur = &pTsdbReadHandle->cur; STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); @@ -1340,8 +1316,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBl return code; } -static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); - static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -3659,8 +3633,10 @@ void tsdbCleanupReadHandle(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } -STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - uint64_t taskId) { +int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + uint64_t taskId, STsdbReader** ppReader) { + int32_t code = 0; + STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pReader == NULL) { return NULL; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e55eebfc90..36e3e70378 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -798,7 +798,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader *pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 253b990913..000fc1c6a2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4407,9 +4407,9 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT return pTaskInfo; } -static STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, - SNode* pTagCond); +static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, + SNode* pTagCond); static SArray* extractColumnInfo(SNodeList* pNodeList); @@ -4530,7 +4530,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STsdbReader pDataReader = + STsdbReader* pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; @@ -4574,7 +4574,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; - STsdbReader pDataReader = NULL; + STsdbReader* pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); } else { @@ -5049,8 +5049,8 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa return code; } -STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { +STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { @@ -5069,7 +5069,8 @@ STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* goto _error; } - STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + STsdbReader* pReader; + code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId, &pReader); clearupQueryTableDataCond(&cond); return pReader; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b6805c6527..9b84f68fe7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -529,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { } } -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader, +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader* pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1929,7 +1929,8 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); - STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId); + STsdbReader* pReader; + code = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId, &pReader); taosArrayPush(arrayReader, &pReader); taosArrayDestroy(subListInfo->pTableList); -- GitLab