From 9913ed1dd1c63c3549b83234a752176e6ad60930 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 12 Jan 2023 14:15:29 +0800 Subject: [PATCH] fix: concurrency issue --- include/libs/stream/tstream.h | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c00625c51c..c5352eee46 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -354,7 +354,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { - if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + int8_t type = pItem->type; + if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); if (pSubmitClone == NULL) { qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); @@ -365,19 +366,19 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone); - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE || - pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || + type == STREAM_INPUT__REF_DATA_BLOCK) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); - } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); - } else if (pItem->type == STREAM_INPUT__GET_RES) { + } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } - if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -- GitLab