diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index df8847c26de923ad98b53b3f21bc69af7f715c6f..361cd2cacc1d2ba963b65387d2ce90e6b1f77ab1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -287,12 +287,12 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); - - int32_t total = taosQueueItemSize(pTask->inputQueue->queue); + int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total); + + taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { taosWriteQitem(pTask->inputQueue->queue, pItem);