提交 5bd3732a 编写于 作者: H Hongze Cheng

refact more code

上级 5485c7fc
...@@ -116,16 +116,16 @@ typedef struct STsdbReader STsdbReader; ...@@ -116,16 +116,16 @@ typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId, STsdbReader **ppReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(STsdbReader *pReader); void tsdbCleanupReadHandle(STsdbReader *pReader);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
......
...@@ -110,22 +110,20 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); ...@@ -110,22 +110,20 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId); void* pMemRef);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
void* pMemRef); int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
// tq // tq
int tqInit(); int tqInit();
......
...@@ -141,23 +141,6 @@ struct STsdbReader { ...@@ -141,23 +141,6 @@ struct STsdbReader {
STSchema* pSchema; STSchema* pSchema;
}; };
static STimeWindow updateLastrowForEachGroup(STableListInfo* pList);
static int32_t checkForCachedLastRow(STsdbReader* pTsdbReadHandle, STableListInfo* pList);
static int32_t checkForCachedLast(STsdbReader* pTsdbReadHandle);
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(STsdbReader* pHandle);
static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbReader* pTsdbReadHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
// static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void* doFreeColumnInfoData(SArray* pColumnInfoData);
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(STsdbReader* pHandle);
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1; pBlockLoadInfo->slot = -1;
pBlockLoadInfo->uid = 0; pBlockLoadInfo->uid = 0;
...@@ -575,33 +558,33 @@ void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCo ...@@ -575,33 +558,33 @@ void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCo
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
} }
STsdbReader* tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, // STsdbReader* tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
uint64_t taskId) { // uint64_t taskId) {
pCond->twindows[0] = updateLastrowForEachGroup(pList); // pCond->twindows[0] = updateLastrowForEachGroup(pList);
// no qualified table // // no qualified table
if (taosArrayGetSize(pList->pTableList) == 0) { // if (taosArrayGetSize(pList->pTableList) == 0) {
return NULL; // return NULL;
} // }
STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); // STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
if (pTsdbReadHandle == NULL) { // if (pTsdbReadHandle == NULL) {
return NULL; // return NULL;
} // }
int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList); // int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 // if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; // terrno = code;
return NULL; // return NULL;
} // }
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey); // assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey);
if (pTsdbReadHandle->cachelastrow) { // if (pTsdbReadHandle->cachelastrow) {
pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; // pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
} // }
return pTsdbReadHandle; // return pTsdbReadHandle;
} // }
#if 0 #if 0
STsdbReader * tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { STsdbReader * tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) {
...@@ -1226,13 +1209,6 @@ _error: ...@@ -1226,13 +1209,6 @@ _error:
return terrno; return terrno;
} }
static int32_t getEndPosInDataBlock(STsdbReader* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start,
int32_t end);
static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos);
static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
...@@ -1340,8 +1316,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBl ...@@ -1340,8 +1316,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBl
return code; return code;
} }
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
bool* exists) { bool* exists) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
...@@ -3659,8 +3633,10 @@ void tsdbCleanupReadHandle(STsdbReader* pReader) { ...@@ -3659,8 +3633,10 @@ void tsdbCleanupReadHandle(STsdbReader* pReader) {
taosMemoryFreeClear(pReader); taosMemoryFreeClear(pReader);
} }
STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) { uint64_t taskId, STsdbReader** ppReader) {
int32_t code = 0;
STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pReader == NULL) { if (pReader == NULL) {
return NULL; return NULL;
......
...@@ -798,7 +798,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -798,7 +798,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader *pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
......
...@@ -4407,9 +4407,9 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4407,9 +4407,9 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return pTaskInfo; return pTaskInfo;
} }
static STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagCond); SNode* pTagCond);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
...@@ -4530,7 +4530,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4530,7 +4530,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STsdbReader pDataReader = STsdbReader* pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
return NULL; return NULL;
...@@ -4574,7 +4574,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4574,7 +4574,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = { STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
STsdbReader pDataReader = NULL; STsdbReader* pDataReader = NULL;
if (pHandle->vnode) { if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else { } else {
...@@ -5049,8 +5049,8 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa ...@@ -5049,8 +5049,8 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
return code; return code;
} }
STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = int32_t code =
getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5069,7 +5069,8 @@ STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* ...@@ -5069,7 +5069,8 @@ STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error; goto _error;
} }
STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); STsdbReader* pReader;
code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId, &pReader);
clearupQueryTableDataCond(&cond); clearupQueryTableDataCond(&cond);
return pReader; return pReader;
......
...@@ -529,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -529,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader, SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader* pDataReader,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -1929,7 +1929,8 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand ...@@ -1929,7 +1929,8 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId); STsdbReader* pReader;
code = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId, &pReader);
taosArrayPush(arrayReader, &pReader); taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList); taosArrayDestroy(subListInfo->pTableList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册