提交 f8a2ab83 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 c996837f
......@@ -249,10 +249,11 @@ typedef struct SColumnInfoData {
typedef struct SQueryTableDataCond {
uint64_t suid;
int32_t order; // desc|asc order to iterate the data block
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
int32_t* pSlotList; // the column output destation slot, and it may be null
int32_t type; // data block load type:
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
......
......@@ -41,7 +41,7 @@ typedef struct SBlockOrderInfo {
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
#define colDataSetNotNull_f(bm_, r_) \
#define colDataClearNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
......@@ -151,9 +151,6 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
for (int32_t i = start; i < start + nRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
}
int32_t bytes = pColumnInfoData->info.bytes;
memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes);
}
pColumnInfoData->hasNull = true;
......@@ -238,6 +235,7 @@ int32_t blockDataEnsureCapacityNoClear(SSDataBlock* pDataBlock, uint32_t numOfRo
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
void blockDataEmpty(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
......
......@@ -1137,17 +1137,15 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
}
void blockDataCleanup(SSDataBlock* pDataBlock) {
blockDataEmpty(pDataBlock);
SDataBlockInfo* pInfo = &pDataBlock->info;
int32_t existedRows = pInfo->rows;
ASSERT(existedRows <= pDataBlock->info.capacity);
pInfo->rows = 0;
pInfo->id.uid = 0;
pInfo->id.groupId = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
}
void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
ASSERT(pInfo->rows <= pDataBlock->info.capacity);
if (pInfo->capacity == 0) {
return;
}
......@@ -1155,8 +1153,12 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfoDataCleanup(p, existedRows);
colInfoDataCleanup(p, pInfo->capacity);
}
pInfo->rows = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
}
// todo temporarily disable it
......@@ -1643,6 +1645,8 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
// clear the offset value of the unused entries.
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
} else {
int32_t bytes = pColInfoData->info.bytes;
......@@ -1657,7 +1661,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
}
if (pBlock->info.rows <= n) {
blockDataCleanup(pBlock);
blockDataEmpty(pBlock);
} else {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -1674,12 +1678,22 @@ static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
} else { // reset the bitmap value
int32_t stopIndex = BitmapLen(n) * 8;
for(int32_t i = n + 1; i < stopIndex; ++i) {
colDataClearNull_f(pColInfoData->nullbitmap, i);
}
int32_t remain = BitmapLen(total) - BitmapLen(n);
if (remain > 0) {
memset(pColInfoData->nullbitmap, 0, remain);
}
}
}
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
if (n == 0) {
blockDataCleanup(pBlock);
blockDataEmpty(pBlock);
return TSDB_CODE_SUCCESS;
}
......
......@@ -159,11 +159,10 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
STsdbReader **ppReader, const char *idstr);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
......
......@@ -85,7 +85,8 @@ typedef struct SBlockLoadSuppInfo {
SArray* pColAgg;
SColumnDataAgg tsColAgg;
SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data
int16_t* colIds; // column ids for loading file block data
int16_t* slotIds; // the ordinal index in the destination SSDataBlock
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated
......@@ -214,25 +215,25 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SSDataBlock* pBlock) {
size_t numOfCols = blockDataGetNumOfCols(pBlock);
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
pSupInfo->slotIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL || pSupInfo->slotIds == NULL) {
taosMemoryFree(pSupInfo->colIds);
taosMemoryFree(pSupInfo->buildBuf);
taosMemoryFree(pSupInfo->slotIds);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
pSupInfo->colIds[i] = pCol->info.colId;
pSupInfo->colIds[i] = pCols[i].colId;
pSupInfo->slotIds[i] = pSlotIdList[i];
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
if (IS_VAR_DATA_TYPE(pCols[i].type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
}
}
......@@ -566,7 +567,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
const char* idstr) {
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = 0;
int8_t level = 0;
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
......@@ -585,6 +586,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = capacity;
pReader->pResBlock = pResBlock;
pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
......@@ -592,6 +594,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
ASSERT(pCond->numOfCols > 0);
// todo refactor.
limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file
......@@ -611,13 +614,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end;
}
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
if (pReader->pResBlock == NULL) {
code = terrno;
goto _end;
}
setColumnIdSlotList(&pReader->suppInfo, pReader->pResBlock);
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
*ppReader = pReader;
return code;
......@@ -1041,17 +1038,16 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
}
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int32_t numOfOutputCols = pSupInfo->numOfCols;
SColVal cv = {0};
int64_t st = taosGetTimestampUs();
......@@ -1087,8 +1083,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t i = 0;
int32_t rowIndex = 0;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pSupInfo->colIds[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
i += 1;
}
......@@ -1097,12 +1093,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t num = pBlockData->nColData;
while (i < numOfOutputCols && colIndex < num) {
rowIndex = 0;
pColData = taosArrayGet(pResBlock->pDataBlock, i);
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
if (pData->cid < pColData->info.colId) {
if (pData->cid < pSupInfo->colIds[i]) {
colIndex += 1;
} else if (pData->cid == pColData->info.colId) {
} else if (pData->cid == pSupInfo->colIds[i]) {
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
colDataAppendNNULL(pColData, 0, dumpedRows);
} else {
......@@ -1119,6 +1116,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
colIndex += 1;
i += 1;
} else { // the specified column does not exist in file block, fill with null data
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
colDataAppendNNULL(pColData, 0, dumpedRows);
i += 1;
}
......@@ -1126,7 +1124,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
// fill the mis-matched columns with null value
while (i < numOfOutputCols) {
pColData = taosArrayGet(pResBlock->pDataBlock, i);
pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
colDataAppendNNULL(pColData, 0, dumpedRows);
i += 1;
}
......@@ -3535,7 +3533,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
STableBlockScanInfo* pScanInfo) {
int32_t numOfRows = pBlock->info.rows;
int32_t outputRowIndex = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
int64_t uid = pScanInfo->uid;
......@@ -3545,23 +3543,26 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
SColVal colVal = {0};
int32_t i = 0, j = 0;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
if (pSupInfo->colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
i += 1;
}
while (i < numOfCols && j < pSchema->numOfCols) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
col_id_t colId = pColInfoData->info.colId;
col_id_t colId = pSupInfo->colIds[i];
if (colId == pSchema->columns[j].colId) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
tTSRowGetVal(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
i += 1;
j += 1;
} else if (colId < pSchema->columns[j].colId) {
colDataAppendNULL(pColInfoData, numOfRows);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
colDataAppendNULL(pColInfoData, outputRowIndex);
i += 1;
} else if (colId > pSchema->columns[j].colId) {
j += 1;
......@@ -3570,8 +3571,8 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
// set null value since current column does not exist in the "pSchema"
while (i < numOfCols) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(pColInfoData, numOfRows);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
colDataAppendNULL(pColInfoData, outputRowIndex);
i += 1;
}
......@@ -3586,27 +3587,25 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pReader->suppInfo.colIds[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
}
SColVal cv = {0};
int32_t numOfInputCols = pBlockData->nColData;
int32_t numOfOutputCols = pResBlock->pDataBlock->size;
int32_t numOfOutputCols = pSupInfo->numOfCols;
while (i < numOfOutputCols && j < numOfInputCols) {
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
if (pData->cid < pCol->info.colId) {
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
if (pData->cid < pSupInfo->colIds[i]) {
j += 1;
continue;
}
if (pData->cid == pCol->info.colId) {
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
if (pData->cid == pSupInfo->colIds[i]) {
tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
j += 1;
......@@ -3619,7 +3618,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
}
while (i < numOfOutputCols) {
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotIds[i]);
colDataAppendNULL(pCol, outputRowIndex);
i += 1;
}
......@@ -3718,14 +3717,14 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
STsdbReader** ppReader, const char* idstr) {
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
pCond->twindows.ekey -= 1;
}
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, pResBlock->info.capacity, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -3751,7 +3750,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
// here we only need one more row, so the capacity is set to be ONE.
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -3765,7 +3764,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
pCond->order = order;
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -3837,10 +3836,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
return code;
}
}
void tsdbReaderClose(STsdbReader* pReader) {
if (pReader == NULL) {
......@@ -3874,7 +3873,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pSupInfo->colIds);
taosArrayDestroy(pSupInfo->pColAgg);
for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
}
......@@ -3891,8 +3890,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
}
blockDataDestroy(pReader->pResBlock);
if (pReader->pFileReader != NULL) {
tsdbDataFReaderClose(&pReader->pFileReader);
}
......@@ -4007,16 +4004,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false;
}
return true;
}
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
ASSERT(pReader != NULL);
*rows = pReader->pResBlock->info.rows;
......
......@@ -1241,6 +1241,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
}
}
// set the output flag for each column in SColMatchInfo, according to the
*numOfOutputCols = 0;
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
for (int32_t i = 0; i < num; ++i) {
......@@ -1599,20 +1600,22 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t)*pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
return terrno;
}
// pCond->twindow = pTableScanNode->scanRange;
// TODO: get it from stable scan node
pCond->twindows = pTableScanNode->scanRange;
pCond->suid = pTableScanNode->scan.suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag;
int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
......@@ -1625,6 +1628,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->colList[j].type = pColNode->node.resType.type;
pCond->colList[j].bytes = pColNode->node.resType.bytes;
pCond->colList[j].colId = pColNode->colId;
pCond->pSlotList[j] = pNode->slotId;
j += 1;
}
......@@ -1962,7 +1967,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
struct SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
int64_t st = taosGetTimestampUs();
const char* idStr = GET_TASKID(pTaskInfo);
......
......@@ -1064,7 +1064,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
&pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->base.dataReader == NULL) {
ASSERT(0);
}
......@@ -1116,7 +1116,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->pRes, &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
......@@ -1190,8 +1190,6 @@ void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// }
//
// tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
//
// if (pQueryAttr->limit.offset > blockInfo.rows) {
// pQueryAttr->limit.offset -= blockInfo.rows;
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
......
......@@ -285,7 +285,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
blockDataEmpty(pBlock);
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
......@@ -390,7 +390,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
return terrno;
}
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
// restore the previous value
......@@ -638,16 +637,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
blockDataCleanup(pBlock);
SDataBlockInfo* pBInfo = &pBlock->info;
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->id.uid, &pBInfo->window);
blockDataEnsureCapacity(pBlock, rows); // todo remove it latter
pBInfo->rows = rows;
ASSERT(pBInfo->id.uid != 0);
ASSERT(pBlock->info.id.uid != 0);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
uint32_t status = 0;
......@@ -778,7 +768,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -879,14 +869,14 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
pInfo->base.readHandle = *readHandle;
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacityNoClear(pInfo->pResBlock, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
......@@ -993,10 +983,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
blockDataCleanup(pBlock);
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
......@@ -2301,7 +2289,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL;
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL);
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL);
if (code != 0) {
terrno = code;
destroyTableScanOperatorInfo(pTableScanOp);
......@@ -2531,11 +2519,10 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
blockDataCleanup(pBlock);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2553,12 +2540,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
blockDataCleanup(pBlock);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
if (pQueryCond->order == TSDB_ORDER_ASC) {
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
} else {
......
......@@ -1899,53 +1899,52 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
{
SQueryTableDataCond cond = {0};
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1);
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
{
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
}
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
}
}
pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1);
pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator;
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
\ No newline at end of file
......@@ -1739,8 +1739,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacityNoClear(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
initResultSizeInfo(&pOperator->resultInfo, 512);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
......@@ -4319,7 +4319,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
initResultSizeInfo(&pOperator->resultInfo, 512);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
......
......@@ -1782,7 +1782,7 @@ void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
if (colDataIsNull_s(pOut->columnData, i)) {
int8_t v = 0;
colDataAppendInt8(pOut->columnData, i, &v);
colDataSetNotNull_f(pOut->columnData->nullbitmap, i);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
}
pOut->columnData->hasNull = false;
......
......@@ -279,7 +279,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return NULL;
}
void *p = taosMemoryCalloc(pHashObj->capacity, sizeof(SHashEntry));
void *p = taosMemoryMalloc(pHashObj->capacity * sizeof(SHashEntry));
if (p == NULL) {
taosArrayDestroy(pHashObj->pMemBlock);
taosMemoryFree(pHashObj->hashList);
......@@ -290,6 +290,9 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry));
pHashObj->hashList[i]->num = 0;
pHashObj->hashList[i]->latch = 0;
pHashObj->hashList[i]->next = NULL;
}
taosArrayPush(pHashObj->pMemBlock, &p);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册