提交 c647ded0 编写于 作者: H Haojun Liao

fix(stream): add some logs.

上级 bc474e64
......@@ -826,6 +826,9 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
streamSourceRecoverScanStep1(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......@@ -842,7 +845,6 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
......@@ -852,13 +854,14 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
return -1;
}
memcpy(serializedReq, &req, len);
// dispatch msg
tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr);
SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
......@@ -870,7 +873,8 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
int32_t code = 0;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
return -1;
}
......@@ -912,7 +916,6 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
streamMetaSaveTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......
......@@ -270,8 +270,12 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
goto FAIL;
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
......@@ -280,13 +284,10 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg);
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId);
qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr,
pReq->taskId, vgId);
return 0;
FAIL:
if (buf) rpcFreeCont(buf);
return code;
}
int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
......@@ -407,13 +408,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId;
qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
pTask->selfChildId, blockNum, downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH;
}
code = 0;
FAIL_FIXED_DISPATCH:
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
......@@ -427,6 +430,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t vgSz = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -442,6 +446,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
goto FAIL_SHUFFLE_DISPATCH;
}
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
pReqs[i].taskId = pVgInfo->taskId;
}
......@@ -468,33 +473,41 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
}
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
blockNum, vgSz);
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
// send
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
}
}
code = 0;
FAIL_SHUFFLE_DISPATCH:
if (pReqs) {
for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen);
}
taosMemoryFree(pReqs);
for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen);
}
return code;
taosMemoryFree(pReqs);
}
return 0;
return code;
}
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr,
taosQueueItemSize(pTask->outputQueue->queue));
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
numOfElems);
}
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
......@@ -504,7 +517,7 @@ int32_t streamDispatch(SStreamTask* pTask) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) {
qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr);
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
......@@ -516,10 +529,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
code = -1;
streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE;
}
FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
return code;
......
......@@ -212,7 +212,7 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr);
qDebug("s-task:%s recover step2 (blocking stage) started", pTask->id.idStr);
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
}
......@@ -220,12 +220,13 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
SStreamRecoverFinishReq req = {
.streamId = pTask->id.streamId,
.childId = pTask->selfChildId,
};
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr,
pTask->fixedEpDispatcher.taskId);
req.taskId = pTask->fixedEpDispatcher.taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册