diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e5e9b45576edc372315450bd25c502d40abcfa15..82f079e2fb283732c71e0c16604e991568ce59ed 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; -///// numOfBlocks总和 日志 + qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id); ASSERT(pInfo->validBlockIndex == 0); ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 091b10a63e7c6aded973e20d7941294507bfd74c..a1426e2a969d9b2f2f5a866efbd63bd6f057ca17 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) { SWinKey key = {.groupId = groupId, .ts = pRow->key}; int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len); + qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); ASSERT(code == TSDB_CODE_SUCCESS); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 05f26332f28761702c274360ba403e65dea0ba25..71e1068fb39831d07040b61c99e7a19c7b3494db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1798,7 +1798,7 @@ FETCH_NEXT_BLOCK: /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } -//////todo + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 54be30028e1efa734b43e0302c1a772a5d8f467b..62d68d5ca25a7c3812872be8008766e91d40020d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1538,16 +1538,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0); if (code == TSDB_CODE_SUCCESS) { STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval); - qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, + qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, tmpKey.groupId, mark); } else { STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, key->groupId, mark); } } else { STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, key->groupId, mark); } streamStateFreeCur(pCur);