提交 f9a64cbc 编写于 作者: H Haojun Liao

other: add some logs.

上级 da0d9c78
......@@ -821,13 +821,18 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
// do recovery step 1
streamSourceRecoverScanStep1(pTask);
tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr);
int64_t st = taosGetTimestampMs();
streamSourceRecoverScanStep1(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el);
// build msg to launch next step
SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req);
......@@ -853,20 +858,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
memcpy(serializedReq, &req, len);
// dispatch msg
SRpcMsg rpcMsg = {
.code = 0,
.contLen = len,
.msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
.pCont = serializedReq,
};
tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code;
int32_t code = 0;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
......
......@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status:%d", pTask->id.idStr, status);
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......
......@@ -165,20 +165,24 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
batchCnt++;
qDebug("task %d scan exec block num %d, block limit %d", pTask->id.taskId, batchCnt, batchSz);
qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz);
if (batchCnt >= batchSz) break;
if (batchCnt >= batchSz) {
break;
}
}
if (taosArrayGetSize(pRes) == 0) {
if (finished) {
taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->id.taskId);
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->id.taskId);
qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
continue;
}
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
......
......@@ -20,6 +20,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus);
streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version);
......@@ -197,7 +199,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req*
}
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
//
return streamScanExec(pTask, 100);
}
......@@ -210,8 +211,11 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr);
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
}
return streamScanExec(pTask, 100);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册