未验证 提交 31202e83 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18334 from taosdata/fix/TD-20440

fix: crash caused by stream fill
...@@ -1962,6 +1962,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1962,6 +1962,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
memset(pBuf, 0, sizeof(pBuf)); memset(pBuf, 0, sizeof(pBuf));
char* pData = colDataGetVarData(pColInfoData, j); char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
dataSize = TMIN(dataSize, 50);
memcpy(pBuf, varDataVal(pData), dataSize); memcpy(pBuf, varDataVal(pData), dataSize);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
......
...@@ -754,6 +754,23 @@ typedef struct SStreamPartitionOperatorInfo { ...@@ -754,6 +754,23 @@ typedef struct SStreamPartitionOperatorInfo {
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
} SStreamPartitionOperatorInfo; } SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo { typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup; SStreamFillSupporter* pFillSup;
SSDataBlock* pRes; SSDataBlock* pRes;
......
...@@ -111,22 +111,6 @@ typedef struct SStreamFillInfo { ...@@ -111,22 +111,6 @@ typedef struct SStreamFillInfo {
int32_t delIndex; int32_t delIndex;
} SStreamFillInfo; } SStreamFillInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......
...@@ -2056,6 +2056,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { ...@@ -2056,6 +2056,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
} else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
taosVariantDestroy(&pExprInfo->base.pParam[j].param);
} }
} }
......
...@@ -709,6 +709,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { ...@@ -709,6 +709,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup->pResMap = NULL; pFillSup->pResMap = NULL;
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal); releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
pFillSup->cur.pRowVal = NULL; pFillSup->cur.pRowVal = NULL;
cleanupExprSupp(&pFillSup->notFillExprSup);
taosMemoryFree(pFillSup); taosMemoryFree(pFillSup);
return NULL; return NULL;
...@@ -1417,25 +1418,13 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock ...@@ -1417,25 +1418,13 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock
blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows); blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, pInfo->primaryTsCol);
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pDstBlock->info.rows, &pDstBlock->info);
int32_t numOfNotFill = pInfo->pFillSup->numOfAllCols - pInfo->pFillSup->numOfFillCols;
for (int32_t i = 0; i < numOfNotFill; ++i) {
SFillColInfo* pCol = &pInfo->pFillSup->pAllColInfo[i + pInfo->pFillSup->numOfFillCols];
ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr; pDstBlock->info.rows = 0;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; pSup = &pInfo->pFillSup->notFillExprSup;
int32_t dstSlotId = pExpr->base.resSchema.slotId; setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst1 = taosArrayGet(pDstBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc1 = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
colDataAssign(pDst1, pSrc1, pDstBlock->info.rows, &pDstBlock->info);
}
blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol); blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
} }
...@@ -1577,6 +1566,14 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod ...@@ -1577,6 +1566,14 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
destroyStreamFillSupporter(pFillSup); destroyStreamFillSupporter(pFillSup);
return NULL; return NULL;
} }
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamFillSupporter(pFillSup);
return NULL;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFillSup->pResMap = tSimpleHashInit(16, hashFn); pFillSup->pResMap = tSimpleHashInit(16, hashFn);
pFillSup->hasDelete = false; pFillSup->hasDelete = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册