From 899e4d3350610d456bf3ae6c60b15272f6e4f989 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 18:13:58 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 8 +++++--- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9749906d0..74361ec319 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -957,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 1.deserialize msg and build task SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); return -1; } @@ -974,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, - streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } @@ -989,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + pTask->status.taskStatus, numOfTasks); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e091b6864b..33375fe921 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -375,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstream(pTask, ver); } } -- GitLab