提交 2020ea0a 编写于 作者: 5 54liuyao

fix:stream fill crash

上级 e15d8875
......@@ -1962,6 +1962,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
memset(pBuf, 0, sizeof(pBuf));
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
dataSize = TMIN(dataSize, 50);
memcpy(pBuf, varDataVal(pData), dataSize);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
......
......@@ -754,6 +754,23 @@ typedef struct SStreamPartitionOperatorInfo {
SSDataBlock* pDelRes;
} SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
......
......@@ -111,22 +111,6 @@ typedef struct SStreamFillInfo {
int32_t delIndex;
} SStreamFillInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......
......@@ -2056,6 +2056,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
} else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
taosVariantDestroy(&pExprInfo->base.pParam[j].param);
}
}
......
......@@ -709,6 +709,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup->pResMap = NULL;
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
pFillSup->cur.pRowVal = NULL;
cleanupExprSupp(&pFillSup->notFillExprSup);
taosMemoryFree(pFillSup);
return NULL;
......@@ -1417,25 +1418,13 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock
blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, pInfo->primaryTsCol);
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pDstBlock->info.rows, &pDstBlock->info);
int32_t numOfNotFill = pInfo->pFillSup->numOfAllCols - pInfo->pFillSup->numOfFillCols;
for (int32_t i = 0; i < numOfNotFill; ++i) {
SFillColInfo* pCol = &pInfo->pFillSup->pAllColInfo[i + pInfo->pFillSup->numOfFillCols];
ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
int32_t dstSlotId = pExpr->base.resSchema.slotId;
pDstBlock->info.rows = 0;
pSup = &pInfo->pFillSup->notFillExprSup;
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst1 = taosArrayGet(pDstBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc1 = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
colDataAssign(pDst1, pSrc1, pDstBlock->info.rows, &pDstBlock->info);
}
blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
}
......@@ -1577,6 +1566,14 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
destroyStreamFillSupporter(pFillSup);
return NULL;
}
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamFillSupporter(pFillSup);
return NULL;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
pFillSup->hasDelete = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册