提交 bccb31ac 编写于 作者: 5 54liuyao

fix:stream crash && add debug log

上级 ce768920
......@@ -536,6 +536,7 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pChildren;
SStreamState* pState;
SWinKey delKey;
uint64_t numOfDatapack;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......
......@@ -1146,7 +1146,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
if (delTs > nextKey.ts) {
break;
}
endTs = delTs;
SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
if (delTs == nextKey.ts) {
code = streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
......@@ -1159,7 +1159,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
streamStateFreeCur(pCur);
pCur = streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
}
endTs = TMAX(ts, nextKey.ts - 1);
endTs = TMAX(delTs, nextKey.ts - 1);
if (code != TSDB_CODE_SUCCESS) {
break;
}
......
......@@ -1113,7 +1113,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
if (hasGroup) {
(*pRowIndex) += 1;
} else {
while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) {
while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
(*pRowIndex) += 1;
if ((*pRowIndex) == pDataBlockInfo->rows) {
break;
......
......@@ -2516,9 +2516,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64 , IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
ASSERT(pBlock->info.type != STREAM_INVERT);
......@@ -2734,6 +2736,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pInfo->numOfDatapack = 0;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -4700,8 +4703,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
......@@ -4851,6 +4857,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pChildren = NULL;
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pInfo->numOfDatapack = 0;
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册