未验证 提交 035eb6c7 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #15840 from taosdata/szhou/fix/leave

fix: ensure capacity before join left and right
...@@ -354,8 +354,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); ...@@ -354,8 +354,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
void *getDataMin(int32_t type); void *getDataMin(int32_t type);
void *getDataMax(int32_t type); void *getDataMax(int32_t type);
#define SET_DOUBLE_NULL(v) (*(uint64_t *)(v) = TSDB_DATA_DOUBLE_NULL)
#define SET_BIGINT_NULL(v) (*(uint64_t *)(v) = TSDB_DATA_BIGINT_NULL)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -67,7 +67,7 @@ typedef struct SResultRowEntryInfo { ...@@ -67,7 +67,7 @@ typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed bool complete:1; // query has completed
uint8_t isNullRes:6; // the result is null uint8_t isNullRes:6; // the result is null
uint16_t numOfRes; // num of output result in current buffer uint16_t numOfRes; // num of output result in current buffer. NOT NULL RESULT
} SResultRowEntryInfo; } SResultRowEntryInfo;
// determine the real data need to calculated the result // determine the real data need to calculated the result
......
...@@ -256,7 +256,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t ...@@ -256,7 +256,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
...@@ -264,15 +264,21 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t ...@@ -264,15 +264,21 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
size_t leftNumJoin = taosArrayGetSize(leftRowLocations); size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations); size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), leftNumJoin, rightNumJoin);
}
if (code == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < leftNumJoin; ++i) { for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) { for (int32_t j = 0; j < rightNumJoin; ++j) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SRowLocation *leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); SRowLocation *rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos); rightRow->pos);
++*nRows; ++*nRows;
} }
} }
}
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
......
...@@ -3845,14 +3845,17 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -3845,14 +3845,17 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data); SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data);
if (pInputInfo->hasResult) {
spreadTransferInfo(pInputInfo, pInfo); spreadTransferInfo(pInputInfo, pInfo);
} }
}
SET_VAL(GET_RES_INFO(pCtx), 1, 1); if (pInfo->hasResult) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3861,6 +3864,8 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3861,6 +3864,8 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pInfo->hasResult == true) { if (pInfo->hasResult == true) {
SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min); SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min);
} else {
GET_RES_INFO(pCtx)->isNullRes = 1;
} }
return functionFinalize(pCtx, pBlock); return functionFinalize(pCtx, pBlock);
} }
......
...@@ -415,7 +415,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { ...@@ -415,7 +415,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
*pDst = taosMemoryMalloc(metaSize); *pDst = taosMemoryMalloc(metaSize);
if (NULL == *pDst) { if (NULL == *pDst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册