diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 53b8eadb3304006dfcfb9c4cd62dbc636191cdae..64c15c6c0e252a1febb9e3091a391147508d0ddc 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -161,17 +161,17 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); -void tsdbReaderClose(STsdbReader *pReader); -bool tsdbNextDataBlock(STsdbReader *pReader); -void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); -int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); -SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); -int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -void *tsdbGetIdx(SMeta *pMeta); -void *tsdbGetIvtIdx(SMeta *pMeta); -uint64_t getReaderMaxVersion(STsdbReader *pReader); +void tsdbReaderClose(STsdbReader *pReader); +bool tsdbNextDataBlock(STsdbReader *pReader); +void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); +int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockSMA, bool *allHave); +SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); +int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); +int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); +int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); +void *tsdbGetIdx(SMeta *pMeta); +void *tsdbGetIvtIdx(SMeta *pMeta); +uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, uint64_t suid, void **pReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 24e9181ae6253c40e5fa68dbea2ca92543164ae6..859ae026a2825470c3c747fbe87a5e12066f60fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -81,12 +81,15 @@ typedef struct SIOCostSummary { double createScanInfoList; } SIOCostSummary; +typedef struct SColInfo { + int16_t colId; + int16_t slotId; +} SColInfo; + typedef struct SBlockLoadSuppInfo { SArray* pColAgg; SColumnDataAgg tsColAgg; - SColumnDataAgg** plist; // todo remove this - int16_t* colIds; // column ids for loading file block data - int16_t* slotIds; // the ordinal index in the destination SSDataBlock + SColInfo* colInfo; // column ids for loading file block data int32_t numOfCols; char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. bool smaValid; // the sma on all queried columns are activated @@ -159,6 +162,7 @@ struct STsdbReader { STsdb* pTsdb; uint64_t suid; int16_t order; + bool freeBlock; STimeWindow window; // the primary query time window that applies to all queries SSDataBlock* pResBlock; int32_t capacity; @@ -218,22 +222,21 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; pSupInfo->numOfCols = numOfCols; - pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t)); - pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES); - pSupInfo->slotIds = taosMemoryMalloc(numOfCols * sizeof(int16_t)); - if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL || pSupInfo->slotIds == NULL) { - taosMemoryFree(pSupInfo->colIds); - taosMemoryFree(pSupInfo->buildBuf); - taosMemoryFree(pSupInfo->slotIds); + pSupInfo->colInfo = taosMemoryMalloc(numOfCols * (sizeof(SColInfo) + POINTER_BYTES)); + if (pSupInfo->colInfo == NULL) { + taosMemoryFree(pSupInfo->colInfo); return TSDB_CODE_OUT_OF_MEMORY; } + pSupInfo->buildBuf = (char**)((char*)pSupInfo->colInfo + (sizeof(SColInfo) * numOfCols)); for (int32_t i = 0; i < numOfCols; ++i) { - pSupInfo->colIds[i] = pCols[i].colId; - pSupInfo->slotIds[i] = pSlotIdList[i]; + pSupInfo->colInfo[i].colId = pCols[i].colId; + pSupInfo->colInfo[i].slotId = pSlotIdList[i]; if (IS_VAR_DATA_TYPE(pCols[i].type)) { pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes); + } else { + pSupInfo->buildBuf[i] = NULL; } } @@ -245,7 +248,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { STColumn* pTCol = &pSchema->columns[i]; - if (pTCol->colId == pSupInfo->colIds[j]) { + if (pTCol->colId == pSupInfo->colInfo[j].colId) { if (!IS_BSMA_ON(pTCol)) { pSupInfo->smaValid = false; return; @@ -253,7 +256,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) i += 1; j += 1; - } else if (pTCol->colId < pSupInfo->colIds[j]) { + } else if (pTCol->colId < pSupInfo->colInfo[j].colId) { // do nothing i += 1; } else { @@ -455,7 +458,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb if (pLReader->pInfo == NULL) { // here we ignore the first column, which is always be the primary timestamp column pLReader->pInfo = - tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1); + tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colInfo[1].colId, pReader->suppInfo.numOfCols - 1); if (pLReader->pInfo == NULL) { tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); return terrno; @@ -594,14 +597,22 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket ASSERT(pCond->numOfCols > 0); + if (pReader->pResBlock == NULL) { + pReader->freeBlock = true; + pReader->pResBlock = createResBlock(pCond, pReader->capacity); + if (pReader->pResBlock == NULL) { + code = terrno; + goto _end; + } + } + // todo refactor. limitOutputBufferSize(pCond, &pReader->capacity); // allocate buffer in order to load data blocks from file SBlockLoadSuppInfo* pSup = &pReader->suppInfo; pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg)); - pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); - if (pSup->pColAgg == NULL || pSup->plist == NULL) { + if (pSup->pColAgg == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } @@ -1083,8 +1094,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t i = 0; int32_t rowIndex = 0; - SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); - if (pSupInfo->colIds[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); + if (pSupInfo->colInfo[i].colId == PRIMARYKEY_TIMESTAMP_COL_ID) { copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc); i += 1; } @@ -1095,10 +1106,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn rowIndex = 0; SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex); - if (pData->cid < pSupInfo->colIds[i]) { + if (pData->cid < pSupInfo->colInfo[i].colId) { colIndex += 1; - } else if (pData->cid == pSupInfo->colIds[i]) { - pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); + } else if (pData->cid == pSupInfo->colInfo[i].colId) { + pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) { colDataAppendNNULL(pColData, 0, dumpedRows); @@ -1116,7 +1127,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn colIndex += 1; i += 1; } else { // the specified column does not exist in file block, fill with null data - pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); + pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); colDataAppendNNULL(pColData, 0, dumpedRows); i += 1; } @@ -1124,7 +1135,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn // fill the mis-matched columns with null value while (i < numOfOutputCols) { - pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); + pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); colDataAppendNNULL(pColData, 0, dumpedRows); i += 1; } @@ -1162,7 +1173,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tBlockDataReset(pBlockData); TABLEID tid = {.suid = pReader->suid, .uid = uid}; int32_t code = - tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1); + tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colInfo[1].colId, pReader->suppInfo.numOfCols - 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1621,7 +1632,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* int64_t st = taosGetTimestampUs(); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); - blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotIds[0]); + blockDataUpdateTsWindow(pBlock, pReader->suppInfo.colInfo[0].slotId); pBlock->info.id.uid = pBlockScanInfo->uid; setComposedBlockFlag(pReader, true); @@ -2493,7 +2504,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { _end: pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; - blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotIds[0]); + blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.colInfo[0].slotId); setComposedBlockFlag(pReader, true); double el = (taosGetTimestampUs() - st) / 1000.0; @@ -3542,24 +3553,24 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* SColVal colVal = {0}; int32_t i = 0, j = 0; - if (pSupInfo->colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]); + if (pSupInfo->colInfo[i].colId== PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId); ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts; i += 1; } while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) { - col_id_t colId = pSupInfo->colIds[i]; + col_id_t colId = pSupInfo->colInfo[i].colId; if (colId == pSchema->columns[j].colId) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId); tTSRowGetVal(pTSRow, pSchema, j, &colVal); doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); i += 1; j += 1; } else if (colId < pSchema->columns[j].colId) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId); colDataAppendNULL(pColInfoData, outputRowIndex); i += 1; @@ -3570,7 +3581,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* // set null value since current column does not exist in the "pSchema" while (i < pSupInfo->numOfCols) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId); colDataAppendNULL(pColInfoData, outputRowIndex); i += 1; } @@ -3586,8 +3597,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t outputRowIndex = pResBlock->info.rows; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - if (pReader->suppInfo.colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { - SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); + if (pReader->suppInfo.colInfo[i].colId== PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex]; i += 1; } @@ -3598,13 +3609,13 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S while (i < numOfOutputCols && j < numOfInputCols) { SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); - if (pData->cid < pSupInfo->colIds[i]) { + if (pData->cid < pSupInfo->colInfo[i].colId) { j += 1; continue; } - SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotIds[i]); - if (pData->cid == pSupInfo->colIds[i]) { + SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); + if (pData->cid == pSupInfo->colInfo[i].colId) { tColDataGetValue(pData, rowIndex, &cv); doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); j += 1; @@ -3617,7 +3628,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S } while (i < numOfOutputCols) { - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId); colDataAppendNULL(pCol, outputRowIndex); i += 1; } @@ -3723,7 +3734,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pCond->twindows.ekey -= 1; } - int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, pResBlock->info.capacity, pResBlock, idstr); + int32_t capacity = 0; + if (pResBlock == NULL) { + capacity = 4096; + } else { + capacity = pResBlock->info.capacity; + } + + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3868,10 +3886,6 @@ void tsdbReaderClose(STsdbReader* pReader) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - taosMemoryFreeClear(pSupInfo->plist); - taosMemoryFree(pSupInfo->colIds); - taosMemoryFree(pSupInfo->slotIds); - taosArrayDestroy(pSupInfo->pColAgg); for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) { if (pSupInfo->buildBuf[i] != NULL) { @@ -3879,9 +3893,12 @@ void tsdbReaderClose(STsdbReader* pReader) { } } - taosMemoryFree(pSupInfo->buildBuf); - tBlockDataDestroy(&pReader->status.fileBlockData, true); + if (pReader->freeBlock) { + pReader->pResBlock = blockDataDestroy(pReader->pResBlock); + } + taosMemoryFree(pSupInfo->colInfo); + tBlockDataDestroy(&pReader->status.fileBlockData, true); cleanupDataBlockIterator(&pReader->status.blockIter); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); @@ -4025,18 +4042,18 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64 } } -int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { +int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) { int32_t code = 0; *allHave = false; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { - *pBlockStatis = NULL; + *pBlockSMA = NULL; return TSDB_CODE_SUCCESS; } // there is no statistics data for composed block if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) { - *pBlockStatis = NULL; + *pBlockSMA = NULL; return TSDB_CODE_SUCCESS; } @@ -4054,7 +4071,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS return code; } } else { - *pBlockStatis = NULL; + *pBlockSMA = NULL; return TSDB_CODE_SUCCESS; } @@ -4067,32 +4084,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pTsAgg->min = pReader->pResBlock->info.window.skey; pTsAgg->max = pReader->pResBlock->info.window.ekey; - pSup->plist[0] = pTsAgg; // update the number of NULL data rows size_t numOfCols = pSup->numOfCols; int32_t i = 0, j = 0; size_t size = taosArrayGetSize(pSup->pColAgg); -#if 0 - while (j < numOfCols && i < size) { - SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); - if (pAgg->colId == pSup->colIds[j]) { - if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) { - pSup->plist[j] = pAgg; - } else { - *allHave = false; - break; - } - i += 1; - j += 1; - } else if (pAgg->colId < pSup->colIds[j]) { - i += 1; - } else if (pSup->colIds[j] < pAgg->colId) { - j += 1; - } - } -#else SSDataBlock* pResBlock = pReader->pResBlock; if (pResBlock->pBlockAgg == NULL) { @@ -4100,52 +4097,40 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); } - // fill the all null data column -// SArray* pNewAggList = taosArrayInit(numOfCols, sizeof(SColumnDataAgg)); - while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); - if (pAgg->colId == pSup->colIds[j]) { - pResBlock->pBlockAgg[pSup->slotIds[j]] = pAgg; + if (pAgg->colId == pSup->colInfo[j].colId) { + pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = pAgg; i += 1; j += 1; - } else if (pAgg->colId < pSup->colIds[j]) { + } else if (pAgg->colId < pSup->colInfo[j].colId) { i += 1; - } else if (pSup->colIds[j] < pAgg->colId) { - if (pSup->colIds[j] == PRIMARYKEY_TIMESTAMP_COL_ID) { - pResBlock->pBlockAgg[pSup->slotIds[j]] = &pSup->tsColAgg; + } else if (pSup->colInfo[j].colId < pAgg->colId) { + if (pSup->colInfo[j].colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = &pSup->tsColAgg; } else { // all date in this block are null - SColumnDataAgg nullColAgg = {.colId = pSup->colIds[j], .numOfNull = pBlock->nRow}; + SColumnDataAgg nullColAgg = {.colId = pSup->colInfo[j].colId, .numOfNull = pBlock->nRow}; taosArrayPush(pSup->pColAgg, &nullColAgg); - pResBlock->pBlockAgg[pSup->slotIds[j]] = taosArrayGetLast(pSup->pColAgg); + pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = taosArrayGetLast(pSup->pColAgg); } j += 1; } } -// taosArrayClear(pSup->pColAgg); - -// size_t num = taosArrayGetSize(pSup->pColAgg); -// for(int32_t k = 0; k < num; ++k) { -// pSup->plist[k] = taosArrayGet(pSup->pColAgg, k); -// } - -#endif - + *pBlockSMA = pResBlock->pBlockAgg; pReader->cost.smaDataLoad += 1; -// *pBlockStatis = pSup->plist; tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); return code; } -static SArray* doRetrieveDataBlock(STsdbReader* pReader) { +static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (pStatus->composedDataBlock) { - return pReader->pResBlock->pDataBlock; + return pReader->pResBlock; } SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); @@ -4166,10 +4151,10 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { } copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - return pReader->pResBlock->pDataBlock; + return pReader->pResBlock; } -SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { +SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { return doRetrieveDataBlock(pReader->innerReader[0]); @@ -4196,7 +4181,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { // allocate buffer in order to load data blocks from file memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); - memset(pReader->suppInfo.plist, 0, POINTER_BYTES); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; tsdbDataFReaderClose(&pReader->pFileReader); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d675ac05d5869ed601e129237b3fc7d14cf42733..df11add919e86d3465dc0974b5543cc0c8908d59 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -957,21 +957,27 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s pCond->order = TSDB_ORDER_ASC; pCond->numOfCols = pMtInfo->schema->nCols; pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); - if (pCond->colList == NULL) { + pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols); + if (pCond->colList == NULL || pCond->pSlotList == NULL) { + taosMemoryFreeClear(pCond->colList); + taosMemoryFreeClear(pCond->pSlotList); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return terrno; } - pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->twindows = TSWINDOW_INITIALIZER; pCond->suid = pMtInfo->suid; pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = sContext->snapVersion; for (int32_t i = 0; i < pCond->numOfCols; ++i) { - pCond->colList[i].type = pMtInfo->schema->pSchema[i].type; - pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes; - pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId; + SColumnInfo* pColInfo = &pCond->colList[i]; + pColInfo->type = pMtInfo->schema->pSchema[i].type; + pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes; + pColInfo->colId = pMtInfo->schema->pSchema[i].colId; + + pCond->pSlotList[i] = i; } return TSDB_CODE_SUCCESS; @@ -1116,7 +1122,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); ASSERT(size == 1); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->pRes, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b0104c6025a95116ecdf7dd0c8f8728b8f809df1..f09b7dae583e5b0b6c9c55106ee45c1662e249fd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -224,10 +224,8 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA } static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { - bool allColumnsHaveAgg = true; - SColumnDataAgg** pColAgg = NULL; - - int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); + bool allColumnsHaveAgg = true; + int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pBlock->pBlockAgg, &allColumnsHaveAgg); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -387,11 +385,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - if (pCols == NULL) { + SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); + if (p == NULL) { return terrno; } + ASSERT(p == pBlock); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); // restore the previous value @@ -994,19 +993,10 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU return NULL; } - bool hasBlock = tsdbNextDataBlock(pReader); - if (hasBlock) { - SDataBlockInfo* pBInfo = &pBlock->info; - -// int32_t rows = 0; -// tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window); - - SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); - -// relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true); + if (tsdbNextDataBlock(pReader)) { + /*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); - - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); } tsdbReaderClose(pReader); @@ -2030,20 +2020,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { qDebug("tmqsnap doRawScan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pBlock = &pInfo->pRes; - if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window); - pBlock->info.rows = rows; - - SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); - pBlock->pDataBlock = pCols; - if (pCols == NULL) { + SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); + if (pBlock == NULL) { longjmp(pTaskInfo->env, terrno); } diff --git a/tests/system-test/2-query/unique.py b/tests/system-test/2-query/unique.py index 2b0336d2d7bff3d601abcde3f0c0e59f390e6b6b..6af9b130ef7b36c493e7bb1edc41fa2f391bf2e0 100644 --- a/tests/system-test/2-query/unique.py +++ b/tests/system-test/2-query/unique.py @@ -429,10 +429,10 @@ class TDTestCase: tdSql.checkRows(2) # nest query - tdSql.query(f"select unique(c1) from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) ") + tdSql.query(f"select unique(c1) v from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) order by v") tdSql.checkRows(11) - tdSql.checkData(0,0,6) - tdSql.checkData(10,0,3) + tdSql.checkData(1,0,0) + tdSql.checkData(10,0,9) tdSql.query(f"select unique(t1) from (select _rowts , t1 , tbname from {dbname}.stb1 )") tdSql.checkRows(2) tdSql.checkData(0,0,4)