diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9749906d01ea797eab02bbe72fe259463f365e0..74361ec31937fb8cdaa0192231e3186b338f7aee 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -957,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 1.deserialize msg and build task SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); return -1; } @@ -974,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, - streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } @@ -989,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + pTask->status.taskStatus, numOfTasks); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e091b6864b95d8c1b5dd601acd9cf570ebdcafb0..33375fe921b2971a96c36761182db63e61e95ad0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -375,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstream(pTask, ver); } }