提交 07792850 编写于 作者: H Hongze Cheng

refact code

上级 76b5cef3
...@@ -110,30 +110,28 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -110,30 +110,28 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb // tsdb
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef void *tsdbReaderT; typedef void *STsdbReader;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
void *pMemRef); bool isTsdbCacheLastRow(STsdbReader *pReader);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); bool tsdbNextDataBlock(STsdbReader pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(STsdbReader *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx); void tsdbResetReadHandle(STsdbReader queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(STsdbReader queryHandle);
// tq // tq
......
...@@ -119,9 +119,9 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu ...@@ -119,9 +119,9 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
......
...@@ -15,22 +15,25 @@ ...@@ -15,22 +15,25 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct STsdbReader STsdbReader; struct STsdbReader {
STsdb *pTsdb;
};
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;
int64_t version; int64_t version;
} SSkylineItem; } SSkyline;
// =================== STATIC METHODS =====================
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
int32_t code = 0; int32_t code = 0;
int32_t i1 = 0; int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(aSkyline1); int32_t n1 = taosArrayGetSize(aSkyline1);
int32_t i2 = 0; int32_t i2 = 0;
int32_t n2 = taosArrayGetSize(aSkyline2); int32_t n2 = taosArrayGetSize(aSkyline2);
SSkylineItem *pItem1; SSkyline *pSkyline1;
SSkylineItem *pItem2; SSkyline *pSkyline2;
SSkylineItem item; SSkyline item;
int64_t version1 = 0; int64_t version1 = 0;
int64_t version2 = 0; int64_t version2 = 0;
...@@ -39,28 +42,24 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS ...@@ -39,28 +42,24 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
taosArrayClear(aSkyline); taosArrayClear(aSkyline);
while (i1 < n1 && i2 < n2) { while (i1 < n1 && i2 < n2) {
pItem1 = (SSkylineItem *)taosArrayGet(aSkyline1, i1); pSkyline1 = (SSkyline *)taosArrayGet(aSkyline1, i1);
pItem2 = (SSkylineItem *)taosArrayGet(aSkyline2, i2); pSkyline2 = (SSkyline *)taosArrayGet(aSkyline2, i2);
if (pItem1->ts < pItem2->ts) { if (pSkyline1->ts < pSkyline2->ts) {
version1 = pItem1->version; version1 = pSkyline1->version;
item.ts = pItem1->ts;
item.version = TMAX(version1, version2);
i1++; i1++;
} else if (pItem1->ts > pItem2->ts) { } else if (pSkyline1->ts > pSkyline2->ts) {
version2 = pItem2->version; version2 = pSkyline2->version;
item.ts = pItem2->ts;
item.version = TMAX(version1, version2);
i2++; i2++;
} else { } else {
version1 = pItem1->version; version1 = pSkyline1->version;
version2 = pItem2->version; version2 = pSkyline2->version;
item.ts = pItem1->ts;
item.version = TMAX(version1, version2);
i1++; i1++;
i2++; i2++;
} }
item.ts = TMIN(pSkyline1->ts, pSkyline2->ts);
item.version = TMAX(version1, version2);
if (taosArrayPush(aSkyline, &item) == NULL) { if (taosArrayPush(aSkyline, &item) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -68,8 +67,10 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS ...@@ -68,8 +67,10 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
} }
while (i1 < n1) { while (i1 < n1) {
pItem1 = (SSkylineItem *)taosArrayGet(aSkyline1, i1); pSkyline1 = (SSkyline *)taosArrayGet(aSkyline1, i1);
if (taosArrayPush(aSkyline, &(SSkylineItem){.ts = pItem1->ts, .version = pItem1->version}) == NULL) { item.ts = pSkyline1->ts;
item.version = pSkyline1->version;
if (taosArrayPush(aSkyline, &item) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -77,8 +78,10 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS ...@@ -77,8 +78,10 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
} }
while (i2 < n2) { while (i2 < n2) {
pItem2 = (SSkylineItem *)taosArrayGet(aSkyline2, i2); pSkyline2 = (SSkyline *)taosArrayGet(aSkyline2, i2);
if (taosArrayPush(aSkyline, &(SSkylineItem){.ts = pItem2->ts, .version = pItem2->version}) == NULL) { item.ts = pSkyline2->ts;
item.version = pSkyline2->version;
if (taosArrayPush(aSkyline, &item) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -88,17 +91,41 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS ...@@ -88,17 +91,41 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
_exit: _exit:
return code; return code;
} }
static int32_t tsdbBuildDeleteSkyline(SArray *pDelArray, SArray **ppSkylineArray) { static int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
int32_t code = 0; int32_t code = 0;
SArray *pSkeylineArray = NULL; SDelData *pDelData;
int32_t nDel = pDelArray ? taosArrayGetSize(pDelArray) : 0; int32_t midx;
if (nDel == 0) goto _exit; taosArrayClear(aSkyline);
if (sidx == eidx) {
pDelData = (SDelData *)taosArrayGet(aDelData, sidx);
taosArrayPush(aSkyline, &(SSkyline){.ts = pDelData->sKey, .version = pDelData->version});
taosArrayPush(aSkyline, &(SSkyline){.ts = pDelData->eKey, .version = 0});
} else {
SArray *aSkyline1 = NULL;
SArray *aSkyline2 = NULL;
_exit: aSkyline1 = taosArrayInit(0, sizeof(SSkyline));
*ppSkylineArray = pSkeylineArray; aSkyline2 = taosArrayInit(0, sizeof(SSkyline));
return code; if (aSkyline1 == NULL || aSkyline2 == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
midx = (sidx + eidx) / 2;
code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
if (code) goto _clear;
code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2);
if (code) goto _clear;
code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline);
_clear:
taosArrayDestroy(aSkyline1);
taosArrayDestroy(aSkyline2);
}
_err:
return code; return code;
} }
\ No newline at end of file
...@@ -149,12 +149,3 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { ...@@ -149,12 +149,3 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
*vgId = TD_VID(pVnode); *vgId = TD_VID(pVnode);
} }
} }
// wrapper of tsdb read interface
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo* tableList, uint64_t qId,
void *pMemRef) {
#if 0
return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif
return 0;
}
\ No newline at end of file
...@@ -791,7 +791,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -791,7 +791,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
......
...@@ -4518,7 +4518,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4518,7 +4518,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return pTaskInfo; return pTaskInfo;
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagCond); SNode* pTagCond);
...@@ -4642,7 +4642,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4642,7 +4642,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = STsdbReader pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
return NULL; return NULL;
...@@ -4686,7 +4686,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4686,7 +4686,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = { STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL; STsdbReader pDataReader = NULL;
if (pHandle->vnode) { if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else { } else {
...@@ -5166,7 +5166,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa ...@@ -5166,7 +5166,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
return code; return code;
} }
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = int32_t code =
getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
...@@ -5186,7 +5186,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* ...@@ -5186,7 +5186,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error; goto _error;
} }
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
clearupQueryTableDataCond(&cond); clearupQueryTableDataCond(&cond);
return pReader; return pReader;
......
...@@ -529,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -529,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STsdbReader pDataReader,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -804,7 +804,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -804,7 +804,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
} }
} }
/* Todo(liuyao) for partition by column /* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true); SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult); blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
...@@ -814,7 +814,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -814,7 +814,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
} }
} }
return pResult; return pResult;
*/ */
} }
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
...@@ -829,7 +829,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa ...@@ -829,7 +829,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
int32_t i = 0; int32_t i = 0;
for ( ; i < size; i++) { for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
if (pInfo->groupId != id) { if (pInfo->groupId != id) {
...@@ -1851,7 +1851,7 @@ _error: ...@@ -1851,7 +1851,7 @@ _error:
} }
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
SArray* dataReaders; // array of tsdbReaderT* SArray* dataReaders; // array of STsdbReader*
SReadHandle readHandle; SReadHandle readHandle;
int32_t bufPageSize; int32_t bufPageSize;
...@@ -1924,7 +1924,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand ...@@ -1924,7 +1924,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId); STsdbReader* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId);
taosArrayPush(arrayReader, &pReader); taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList); taosArrayDestroy(subListInfo->pTableList);
...@@ -1979,7 +1979,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -1979,7 +1979,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
bool allColumnsHaveAgg = true; bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL; SColumnDataAgg** pColAgg = NULL;
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg); tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) { if (allColumnsHaveAgg == true) {
...@@ -2020,7 +2020,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -2020,7 +2020,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
pCost->totalCheckedRows += pBlock->info.rows; pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1; pCost->loadBlocks += 1;
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
if (pCols == NULL) { if (pCols == NULL) {
return terrno; return terrno;
...@@ -2066,7 +2066,7 @@ static SSDataBlock* getTableDataBlock(void* param) { ...@@ -2066,7 +2066,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) { if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
...@@ -2195,7 +2195,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2195,7 +2195,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
...@@ -2213,8 +2212,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2213,8 +2212,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
SSDataBlock* pBlock = SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
...@@ -2229,7 +2227,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2229,7 +2227,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
clearupQueryTableDataCond(&pTableScanInfo->cond); clearupQueryTableDataCond(&pTableScanInfo->cond);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
tsdbCleanupReadHandle(reader); tsdbCleanupReadHandle(reader);
} }
taosArrayDestroy(pTableScanInfo->dataReaders); taosArrayDestroy(pTableScanInfo->dataReaders);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册