提交 1dd89a44 编写于 作者: H Hongze Cheng

Merge branch 'feature/3_liaohj' of https://github.com/taosdata/TDengine into feature/3_liaohj

...@@ -131,7 +131,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl ...@@ -131,7 +131,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
......
...@@ -55,6 +55,7 @@ typedef struct SIOCostSummary { ...@@ -55,6 +55,7 @@ typedef struct SIOCostSummary {
} SIOCostSummary; } SIOCostSummary;
typedef struct SBlockLoadSuppInfo { typedef struct SBlockLoadSuppInfo {
SArray* pColAgg;
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
SColumnDataAgg** plist; SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data int16_t* colIds; // column ids for loading file block data
...@@ -364,8 +365,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -364,8 +365,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
if (pSup->plist == NULL) { if (pSup->pColAgg == NULL || pSup->plist == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
...@@ -2649,13 +2651,10 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -2649,13 +2651,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
blockDataDestroy(pReader->pResBlock); blockDataDestroy(pReader->pResBlock);
taosMemoryFreeClear(pReader->suppInfo.plist); taosMemoryFreeClear(pReader->suppInfo.plist);
taosArrayDestroy(pReader->suppInfo.pColAgg);
taosMemoryFree(pReader->suppInfo.slotIds); taosMemoryFree(pReader->suppInfo.slotIds);
if (!isEmptyQueryTimeWindow(&pReader->window)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
} else {
ASSERT(pReader->status.pTableMap == NULL);
}
#if 0 #if 0
// if (pReader->status.pTableScanInfo != NULL) { // if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo); // pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
...@@ -2727,7 +2726,7 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI ...@@ -2727,7 +2726,7 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
pDataBlockInfo->window = pReader->pResBlock->info.window; pDataBlockInfo->window = pReader->pResBlock->info.window;
} }
int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
int32_t code = 0; int32_t code = 0;
*allHave = false; *allHave = false;
...@@ -2743,12 +2742,13 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl ...@@ -2743,12 +2742,13 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl
int64_t stime = taosGetTimestampUs(); int64_t stime = taosGetTimestampUs();
SArray* pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg)); SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
if (tBlockHasSma(pBlock)) { if (tBlockHasSma(pBlock)) {
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pColAgg, NULL); code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64", code:%s, %s", 0, pFBlock->uid, tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
tstrerror(code), pReader->idStr); pReader->idStr);
return code; return code;
} }
} else { } else {
...@@ -2756,44 +2756,44 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl ...@@ -2756,44 +2756,44 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*allHave = true; *allHave = true;
// always load the first primary timestamp column data // always load the first primary timestamp column data
SColumnDataAgg* pTsAgg = &pReader->suppInfo.tsColAgg; SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
pTsAgg->numOfNull = 0; pTsAgg->numOfNull = 0;
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pTsAgg->min = pReader->pResBlock->info.window.skey; pTsAgg->min = pReader->pResBlock->info.window.skey;
pTsAgg->max = pReader->pResBlock->info.window.ekey; pTsAgg->max = pReader->pResBlock->info.window.ekey;
pReader->suppInfo.plist[0] = pTsAgg; pSup->plist[0] = pTsAgg;
// update the number of NULL data rows // update the number of NULL data rows
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock); size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(j < numOfCols && i < taosArrayGetSize(pColAgg)) { while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
SColumnDataAgg* pAgg = taosArrayGet(pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pReader->suppInfo.colIds[j]) { if (pAgg->colId == pSup->colIds[j]) {
if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) { if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
pReader->suppInfo.plist[j] = pAgg; pSup->plist[j] = pAgg;
i += 1; i += 1;
j += 1; j += 1;
} else { } else {
*allHave = false; *allHave = false;
} }
} else if (pAgg->colId < pReader->suppInfo.colIds[j]) { } else if (pAgg->colId < pSup->colIds[j]) {
i += 1; i += 1;
} else if (pReader->suppInfo.colIds[j] < pAgg->colId) { } else if (pSup->colIds[j] < pAgg->colId) {
j += 1; j += 1;
} }
} }
int64_t elapsed = taosGetTimestampUs() - stime; int64_t elapsed = taosGetTimestampUs() - stime;
pReader->cost.smaLoadTime += elapsed; pReader->cost.smaLoadTime += elapsed;
*pBlockStatis = pReader->suppInfo.plist; *pBlockStatis = pSup->plist;
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64", elapsed time:%"PRId64"us, %s", 0, pFBlock->uid, tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
elapsed, pReader->idStr); elapsed, pReader->idStr);
return code; return code;
...@@ -2841,6 +2841,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_ ...@@ -2841,6 +2841,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
memset(pReader->suppInfo.plist, 0, POINTER_BYTES); memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
// todo set the correct numOfTables // todo set the correct numOfTables
int32_t numOfTables = 1; int32_t numOfTables = 1;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
......
...@@ -1130,7 +1130,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1130,7 +1130,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
} else if ((*status) == BLK_DATA_SMA_LOAD) { } else if ((*status) == BLK_DATA_SMA_LOAD) {
// this function never returns error? // this function never returns error?
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
// tsdbRetrieveDatablockSMAInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); // tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); // pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
...@@ -1141,7 +1141,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1141,7 +1141,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
// load the data block statistics to perform further filter // load the data block statistics to perform further filter
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
// tsdbRetrieveDatablockSMAInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); // tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) { if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
{ // set previous window { // set previous window
......
...@@ -210,7 +210,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -210,7 +210,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool allColumnsHaveAgg = true; bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL; SColumnDataAgg** pColAgg = NULL;
int32_t code = tsdbRetrieveDatablockSMAInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -2230,7 +2230,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -2230,7 +2230,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
bool allColumnsHaveAgg = true; bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL; SColumnDataAgg** pColAgg = NULL;
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
tsdbRetrieveDatablockSMAInfo(reader, &pColAgg, &allColumnsHaveAgg); tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) { if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册