提交 899e4d33 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 de44c916
...@@ -957,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -957,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 1.deserialize msg and build task // 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
return -1; return -1;
} }
...@@ -974,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -974,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 2.save task, use the newest commit version as the initial start version of stream task. // 2.save task, use the newest commit version as the initial start version of stream task.
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
...@@ -989,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -989,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); pTask->status.taskStatus, numOfTasks);
return 0; return 0;
} }
......
...@@ -375,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -375,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
} }
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstream(pTask, ver);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册