From 6c13491428174a039eb9fa598dadd5353d8f259e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 8 Jun 2023 16:55:41 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 --- source/dnode/mnode/impl/src/mndScheduler.c | 3 +-- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamData.c | 20 -------------------- 4 files changed, 2 insertions(+), 26 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d80601178..3ce4367724 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -132,7 +132,6 @@ typedef struct { // ref data block, for delete typedef struct { int8_t type; - int64_t ver; SSDataBlock* pBlock; } SStreamRefDataBlock; @@ -207,8 +206,6 @@ void* streamQueueNextItem(SStreamQueue* pQueue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); -SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); - typedef struct { char* qmsg; void* pExecutor; // not applicable to encoder and decoder diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c6d8bb5ffe..1ab11154f0 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -396,8 +396,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); - mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); - + mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pDownstreamTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3befd5d55d..bfb28c0919 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,7 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (50) +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define ONE_MB_F (1048576.0) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 84b5eb3ab7..37923ca807 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -164,26 +164,6 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { - atomic_add_fetch_32(pDataSubmit->dataRef, 1); -} - -SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) { - int32_t len = 0; - if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { - len = pSubmit->submit.msgLen; - } - - SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len); - if (pSubmitClone == NULL) { - return NULL; - } - - streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); - return pSubmitClone; -} - SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; -- GitLab