提交 bbb8aaa0 编写于 作者: H Haojun Liao

refactor: add some logs.

上级 b69137df
...@@ -880,6 +880,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -880,6 +880,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
// do recovery step 2 // do recovery step 2
int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);
code = streamSourceRecoverScanStep2(pTask, sversion); code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -905,6 +908,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -905,6 +908,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1; return -1;
} }
double el = (taosGetTimestampMs() - st)/ 1000.0;
tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task // dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishReq(pTask); code = streamDispatchRecoverFinishReq(pTask);
if (code < 0) { if (code < 0) {
......
...@@ -256,6 +256,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -256,6 +256,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
ASSERT(0); ASSERT(0);
return 0; return 0;
} }
// continue dispatch // continue dispatch
streamDispatch(pTask); streamDispatch(pTask);
return 0; return 0;
......
...@@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov ...@@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
buf = rpcMallocCont(sizeof(SMsgHead) + tlen); buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -512,6 +513,7 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -512,6 +513,7 @@ int32_t streamDispatch(SStreamTask* pTask) {
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr);
return 0; return 0;
} }
......
...@@ -224,8 +224,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { ...@@ -224,8 +224,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// serialize // serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr, qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr,
pTask->fixedEpDispatcher.taskId); pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
req.taskId = pTask->fixedEpDispatcher.taskId; req.taskId = pTask->fixedEpDispatcher.taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册