未验证 提交 fca91b3c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21558 from taosdata/fix/TD-24520

reset sream fill block index
......@@ -1002,9 +1002,10 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.id.groupId = groupId;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
pRes->info.id.groupId = groupId;
pInfo->srcRowIndex++;
if (pInfo->srcRowIndex == 0) {
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
......@@ -1242,7 +1243,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* fillResult = NULL;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
// If there are delete datablocks, we receive them first.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -1281,7 +1282,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case STREAM_PULL_DATA: {
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = 0;
pInfo->srcRowIndex = -1;
} break;
case STREAM_CREATE_CHILD_TABLE: {
return pBlock;
......@@ -1497,7 +1498,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error;
}
pInfo->srcRowIndex = 0;
pInfo->srcRowIndex = -1;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册