未验证 提交 ac656dcc 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #14090 from taosdata/feature/3_liaohj

fix(query): reserve correct buffer before loading data
...@@ -263,6 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in ...@@ -263,6 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in
pColumnInfoData->varmeta.length = len + oldLen; pColumnInfoData->varmeta.length = len + oldLen;
} else { } else {
if (finalNumOfRows > *capacity) { if (finalNumOfRows > *capacity) {
ASSERT(finalNumOfRows*pColumnInfoData->info.bytes);
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
...@@ -1126,6 +1127,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { ...@@ -1126,6 +1127,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
} }
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) { int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) {
ASSERT(numOfRows);
if (0 == numOfRows || numOfRows <= existRows) { if (0 == numOfRows || numOfRows <= existRows) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1178,6 +1180,8 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { ...@@ -1178,6 +1180,8 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
ASSERT(numOfRows > 0);
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -234,6 +234,7 @@ typedef struct SColMatchInfo { ...@@ -234,6 +234,7 @@ typedef struct SColMatchInfo {
int32_t colId; int32_t colId;
int32_t targetSlotId; int32_t targetSlotId;
bool output; bool output;
bool reserved;
int32_t matchType; // determinate the source according to col id or slot id int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo; } SColMatchInfo;
...@@ -253,7 +254,6 @@ typedef struct STableScanInfo { ...@@ -253,7 +254,6 @@ typedef struct STableScanInfo {
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
// int32_t prevGroupId; // previous table group id
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
......
...@@ -609,7 +609,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -609,7 +609,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
} }
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
numOfRows = dest.numOfRows; numOfRows = dest.numOfRows;
...@@ -649,7 +649,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -649,7 +649,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
} }
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
numOfRows = dest.numOfRows; numOfRows = dest.numOfRows;
...@@ -1340,9 +1340,9 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ...@@ -1340,9 +1340,9 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
} }
if (rowRes != NULL) { if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows;
SSDataBlock* px = createOneDataBlock(pBlock, true); SSDataBlock* px = createOneDataBlock(pBlock, true);
int32_t totalRows = pBlock->info.rows;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
......
...@@ -224,7 +224,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -224,7 +224,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) { if (!pColMatchInfo->output) {
continue; continue;
...@@ -384,7 +384,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -384,7 +384,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue; continue;
} }
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); blockDataCleanup(pBlock);
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);
binfo.capacity = binfo.rows;
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info = binfo;
uint32_t status = 0; uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
...@@ -530,7 +537,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -530,7 +537,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
// taosSsleep(20); // taosSsleep(20);
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册