提交 42a512c3 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 83782fb2
......@@ -161,17 +161,17 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockSMA, bool *allHave);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader);
......
......@@ -81,12 +81,15 @@ typedef struct SIOCostSummary {
double createScanInfoList;
} SIOCostSummary;
typedef struct SColInfo {
int16_t colId;
int16_t slotId;
} SColInfo;
typedef struct SBlockLoadSuppInfo {
SArray* pColAgg;
SColumnDataAgg tsColAgg;
SColumnDataAgg** plist; // todo remove this
int16_t* colIds; // column ids for loading file block data
int16_t* slotIds; // the ordinal index in the destination SSDataBlock
SColInfo* colInfo; // column ids for loading file block data
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated
......@@ -159,6 +162,7 @@ struct STsdbReader {
STsdb* pTsdb;
uint64_t suid;
int16_t order;
bool freeBlock;
STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock;
int32_t capacity;
......@@ -218,22 +222,21 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
pSupInfo->slotIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL || pSupInfo->slotIds == NULL) {
taosMemoryFree(pSupInfo->colIds);
taosMemoryFree(pSupInfo->buildBuf);
taosMemoryFree(pSupInfo->slotIds);
pSupInfo->colInfo = taosMemoryMalloc(numOfCols * (sizeof(SColInfo) + POINTER_BYTES));
if (pSupInfo->colInfo == NULL) {
taosMemoryFree(pSupInfo->colInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSupInfo->buildBuf = (char**)((char*)pSupInfo->colInfo + (sizeof(SColInfo) * numOfCols));
for (int32_t i = 0; i < numOfCols; ++i) {
pSupInfo->colIds[i] = pCols[i].colId;
pSupInfo->slotIds[i] = pSlotIdList[i];
pSupInfo->colInfo[i].colId = pCols[i].colId;
pSupInfo->colInfo[i].slotId = pSlotIdList[i];
if (IS_VAR_DATA_TYPE(pCols[i].type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
} else {
pSupInfo->buildBuf[i] = NULL;
}
}
......@@ -245,7 +248,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
STColumn* pTCol = &pSchema->columns[i];
if (pTCol->colId == pSupInfo->colIds[j]) {
if (pTCol->colId == pSupInfo->colInfo[j].colId) {
if (!IS_BSMA_ON(pTCol)) {
pSupInfo->smaValid = false;
return;
......@@ -253,7 +256,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
i += 1;
j += 1;
} else if (pTCol->colId < pSupInfo->colIds[j]) {
} else if (pTCol->colId < pSupInfo->colInfo[j].colId) {
// do nothing
i += 1;
} else {
......@@ -455,7 +458,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
if (pLReader->pInfo == NULL) {
// here we ignore the first column, which is always be the primary timestamp column
pLReader->pInfo =
tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colInfo[1].colId, pReader->suppInfo.numOfCols - 1);
if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno;
......@@ -594,14 +597,22 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
ASSERT(pCond->numOfCols > 0);
if (pReader->pResBlock == NULL) {
pReader->freeBlock = true;
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
if (pReader->pResBlock == NULL) {
code = terrno;
goto _end;
}
}
// todo refactor.
limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
if (pSup->pColAgg == NULL || pSup->plist == NULL) {
if (pSup->pColAgg == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
......@@ -1083,8 +1094,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t i = 0;
int32_t rowIndex = 0;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pSupInfo->colIds[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
if (pSupInfo->colInfo[i].colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
i += 1;
}
......@@ -1095,10 +1106,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
rowIndex = 0;
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
if (pData->cid < pSupInfo->colIds[i]) {
if (pData->cid < pSupInfo->colInfo[i].colId) {
colIndex += 1;
} else if (pData->cid == pSupInfo->colIds[i]) {
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
} else if (pData->cid == pSupInfo->colInfo[i].colId) {
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
colDataAppendNNULL(pColData, 0, dumpedRows);
......@@ -1116,7 +1127,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
colIndex += 1;
i += 1;
} else { // the specified column does not exist in file block, fill with null data
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
colDataAppendNNULL(pColData, 0, dumpedRows);
i += 1;
}
......@@ -1124,7 +1135,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
// fill the mis-matched columns with null value
while (i < numOfOutputCols) {
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
colDataAppendNNULL(pColData, 0, dumpedRows);
i += 1;
}
......@@ -1162,7 +1173,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
tBlockDataReset(pBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = uid};
int32_t code =
tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colInfo[1].colId, pReader->suppInfo.numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1621,7 +1632,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
int64_t st = taosGetTimestampUs();
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotIds[0]);
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.colInfo[0].slotId);
pBlock->info.id.uid = pBlockScanInfo->uid;
setComposedBlockFlag(pReader, true);
......@@ -2493,7 +2504,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
_end:
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotIds[0]);
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.colInfo[0].slotId);
setComposedBlockFlag(pReader, true);
double el = (taosGetTimestampUs() - st) / 1000.0;
......@@ -3542,24 +3553,24 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
SColVal colVal = {0};
int32_t i = 0, j = 0;
if (pSupInfo->colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pSupInfo->colInfo[i].colId== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
i += 1;
}
while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
col_id_t colId = pSupInfo->colIds[i];
col_id_t colId = pSupInfo->colInfo[i].colId;
if (colId == pSchema->columns[j].colId) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
tTSRowGetVal(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
i += 1;
j += 1;
} else if (colId < pSchema->columns[j].colId) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
colDataAppendNULL(pColInfoData, outputRowIndex);
i += 1;
......@@ -3570,7 +3581,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
// set null value since current column does not exist in the "pSchema"
while (i < pSupInfo->numOfCols) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
colDataAppendNULL(pColInfoData, outputRowIndex);
i += 1;
}
......@@ -3586,8 +3597,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pReader->suppInfo.colInfo[i].colId== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
}
......@@ -3598,13 +3609,13 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
while (i < numOfOutputCols && j < numOfInputCols) {
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
if (pData->cid < pSupInfo->colIds[i]) {
if (pData->cid < pSupInfo->colInfo[i].colId) {
j += 1;
continue;
}
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pData->cid == pSupInfo->colIds[i]) {
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
if (pData->cid == pSupInfo->colInfo[i].colId) {
tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
j += 1;
......@@ -3617,7 +3628,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
}
while (i < numOfOutputCols) {
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->colInfo[i].slotId);
colDataAppendNULL(pCol, outputRowIndex);
i += 1;
}
......@@ -3723,7 +3734,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pCond->twindows.ekey -= 1;
}
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, pResBlock->info.capacity, pResBlock, idstr);
int32_t capacity = 0;
if (pResBlock == NULL) {
capacity = 4096;
} else {
capacity = pResBlock->info.capacity;
}
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -3868,10 +3886,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds);
taosMemoryFree(pSupInfo->slotIds);
taosArrayDestroy(pSupInfo->pColAgg);
for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
......@@ -3879,9 +3893,12 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
}
taosMemoryFree(pSupInfo->buildBuf);
tBlockDataDestroy(&pReader->status.fileBlockData, true);
if (pReader->freeBlock) {
pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
}
taosMemoryFree(pSupInfo->colInfo);
tBlockDataDestroy(&pReader->status.fileBlockData, true);
cleanupDataBlockIterator(&pReader->status.blockIter);
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
......@@ -4025,18 +4042,18 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
}
}
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) {
int32_t code = 0;
*allHave = false;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
*pBlockStatis = NULL;
*pBlockSMA = NULL;
return TSDB_CODE_SUCCESS;
}
// there is no statistics data for composed block
if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
*pBlockStatis = NULL;
*pBlockSMA = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -4054,7 +4071,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
return code;
}
} else {
*pBlockStatis = NULL;
*pBlockSMA = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -4067,32 +4084,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pTsAgg->min = pReader->pResBlock->info.window.skey;
pTsAgg->max = pReader->pResBlock->info.window.ekey;
pSup->plist[0] = pTsAgg;
// update the number of NULL data rows
size_t numOfCols = pSup->numOfCols;
int32_t i = 0, j = 0;
size_t size = taosArrayGetSize(pSup->pColAgg);
#if 0
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pSup->colIds[j]) {
if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
pSup->plist[j] = pAgg;
} else {
*allHave = false;
break;
}
i += 1;
j += 1;
} else if (pAgg->colId < pSup->colIds[j]) {
i += 1;
} else if (pSup->colIds[j] < pAgg->colId) {
j += 1;
}
}
#else
SSDataBlock* pResBlock = pReader->pResBlock;
if (pResBlock->pBlockAgg == NULL) {
......@@ -4100,52 +4097,40 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
}
// fill the all null data column
// SArray* pNewAggList = taosArrayInit(numOfCols, sizeof(SColumnDataAgg));
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pSup->colIds[j]) {
pResBlock->pBlockAgg[pSup->slotIds[j]] = pAgg;
if (pAgg->colId == pSup->colInfo[j].colId) {
pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = pAgg;
i += 1;
j += 1;
} else if (pAgg->colId < pSup->colIds[j]) {
} else if (pAgg->colId < pSup->colInfo[j].colId) {
i += 1;
} else if (pSup->colIds[j] < pAgg->colId) {
if (pSup->colIds[j] == PRIMARYKEY_TIMESTAMP_COL_ID) {
pResBlock->pBlockAgg[pSup->slotIds[j]] = &pSup->tsColAgg;
} else if (pSup->colInfo[j].colId < pAgg->colId) {
if (pSup->colInfo[j].colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = &pSup->tsColAgg;
} else {
// all date in this block are null
SColumnDataAgg nullColAgg = {.colId = pSup->colIds[j], .numOfNull = pBlock->nRow};
SColumnDataAgg nullColAgg = {.colId = pSup->colInfo[j].colId, .numOfNull = pBlock->nRow};
taosArrayPush(pSup->pColAgg, &nullColAgg);
pResBlock->pBlockAgg[pSup->slotIds[j]] = taosArrayGetLast(pSup->pColAgg);
pResBlock->pBlockAgg[pSup->colInfo[j].slotId] = taosArrayGetLast(pSup->pColAgg);
}
j += 1;
}
}
// taosArrayClear(pSup->pColAgg);
// size_t num = taosArrayGetSize(pSup->pColAgg);
// for(int32_t k = 0; k < num; ++k) {
// pSup->plist[k] = taosArrayGet(pSup->pColAgg, k);
// }
#endif
*pBlockSMA = pResBlock->pBlockAgg;
pReader->cost.smaDataLoad += 1;
// *pBlockStatis = pSup->plist;
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
return code;
}
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
if (pStatus->composedDataBlock) {
return pReader->pResBlock->pDataBlock;
return pReader->pResBlock;
}
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
......@@ -4166,10 +4151,10 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
}
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
return pReader->pResBlock->pDataBlock;
return pReader->pResBlock;
}
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_PREV) {
return doRetrieveDataBlock(pReader->innerReader[0]);
......@@ -4196,7 +4181,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
// allocate buffer in order to load data blocks from file
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
tsdbDataFReaderClose(&pReader->pFileReader);
......
......@@ -957,21 +957,27 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = pMtInfo->schema->nCols;
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->twindows = TSWINDOW_INITIALIZER;
pCond->suid = pMtInfo->suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
SColumnInfo* pColInfo = &pCond->colList[i];
pColInfo->type = pMtInfo->schema->pSchema[i].type;
pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
pCond->pSlotList[i] = i;
}
return TSDB_CODE_SUCCESS;
......@@ -1116,7 +1122,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->pRes, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
......@@ -224,10 +224,8 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA
}
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
bool allColumnsHaveAgg = true;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pBlock->pBlockAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -387,11 +385,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (pCols == NULL) {
SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (p == NULL) {
return terrno;
}
ASSERT(p == pBlock);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
// restore the previous value
......@@ -994,19 +993,10 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
return NULL;
}
bool hasBlock = tsdbNextDataBlock(pReader);
if (hasBlock) {
SDataBlockInfo* pBInfo = &pBlock->info;
// int32_t rows = 0;
// tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window);
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
// relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true);
if (tsdbNextDataBlock(pReader)) {
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
tsdbReaderClose(pReader);
......@@ -2030,20 +2020,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pBlock = &pInfo->pRes;
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
pBlock->info.rows = rows;
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
pBlock->pDataBlock = pCols;
if (pCols == NULL) {
SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
if (pBlock == NULL) {
longjmp(pTaskInfo->env, terrno);
}
......
......@@ -429,10 +429,10 @@ class TDTestCase:
tdSql.checkRows(2)
# nest query
tdSql.query(f"select unique(c1) from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) ")
tdSql.query(f"select unique(c1) v from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) order by v")
tdSql.checkRows(11)
tdSql.checkData(0,0,6)
tdSql.checkData(10,0,3)
tdSql.checkData(1,0,0)
tdSql.checkData(10,0,9)
tdSql.query(f"select unique(t1) from (select _rowts , t1 , tbname from {dbname}.stb1 )")
tdSql.checkRows(2)
tdSql.checkData(0,0,4)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册