提交 e80ae266 编写于 作者: L liuyao

refact

上级 f0f2215f
......@@ -2819,10 +2819,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doStreamIntervalSaveCheckpoint(pOperator);
pAPI->stateStore.streamStateCommit(pInfo->pState);
copyDataBlock(pInfo->pCheckpointRes, pBlock);
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
......@@ -4248,7 +4244,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator);
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
pOperator->status = OP_RES_TO_RETURN;
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
......@@ -5539,8 +5534,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pAPI->stateStore.streamStateCommit(pInfo->pState);
pInfo->reCkBlock = true;
copyDataBlock(pInfo->pCheckpointRes, pBlock);
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册