diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4b4ef55485891d44b3c99709c955718c9f6398c5..103f8071914b4f9f1ac2914b7335c7217ca70546 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,6 @@ extern "C" { #ifndef _STREAM_H_ #define _STREAM_H_ -typedef void (*_free_reader_fn_t)(void*); typedef struct SStreamTask SStreamTask; enum { @@ -574,7 +573,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamT int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); -SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d4fdbe5d850d671eca949bcf257e740407c6709..1230a352d93c7fcea6342a7430634e56d17d31c3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -659,17 +659,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) { - rsp.status = 1; + if (pTask) { + rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0; + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + + tqDebug("tq recv task check req(reqId:0x%" PRIx64 + ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", + rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, + rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; + tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 + ") %d at node %d, check req from task %d at node %d, rsp status %d", + taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, + rsp.status); } - if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask); - - tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - SEncoder encoder; int32_t code; int32_t len; @@ -687,13 +692,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { tEncodeSStreamTaskCheckRsp(&encoder, &rsp); tEncoderClear(&encoder); - SRpcMsg rspMsg = { - .code = 0, - .pCont = buf, - .contLen = sizeof(SMsgHead) + len, - .info = pMsg->info, - }; - + SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info }; tmsgSendRsp(&rspMsg); return 0; } @@ -709,8 +708,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 tDecoderClear(&decoder); return -1; } - tDecoderClear(&decoder); + tDecoderClear(&decoder); tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); @@ -764,8 +763,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamTaskCheckDownstream(pTask, sversion); } - tqDebug("vgId:%d s-task:%s is deployed from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), + pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); return 0; } @@ -1117,7 +1116,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } - SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c45c7003755ba7a5e0f9095979ac5a9e444fe065..fecc01f295bacd8a23d0d7331d1a5c362759b4c4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -183,7 +183,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } - pTask->status.taskStatus = STREAM_STATUS__NORMAL; taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); return 0; } @@ -215,22 +214,6 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } } -SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { - SStreamTask* pTask = NULL; - taosRLockLatch(&pMeta->lock); - - SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (p != NULL) { - if ((*p) != NULL && atomic_load_8(&((*p)->status.taskStatus)) != TASK_STATUS__DROPPING) { - pTask = *p; - atomic_add_fetch_32(&pTask->refCnt, 1); - } - } - - taosRUnLockLatch(&pMeta->lock); - return pTask; -} - void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6c24e6983230f803864175026e6c1b2abcc09aae..03afc0692d8ce98b9e82fe2ec2ee02da80307119 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } + return 0; } @@ -125,6 +126,7 @@ int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); + if (pRsp->status == 1) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; @@ -135,7 +137,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* break; } } - if (!found) return -1; + + if (!found) { + return -1; + } + int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); ASSERT(left >= 0); if (left == 0) { @@ -144,7 +150,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - if (pRsp->reqId != pTask->checkReqId) return -1; + if (pRsp->reqId != pTask->checkReqId) { + return -1; + } + streamTaskLaunchRecover(pTask, version); } else { ASSERT(0); @@ -164,6 +173,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamRestoreParam(exec); } + int32_t streamSetStatusNormal(SStreamTask* pTask) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); return 0; @@ -224,8 +234,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); + qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream); return 0; } @@ -244,6 +254,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { if (pTask->taskLevel == TASK_LEVEL__AGG) { int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1); + qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left); ASSERT(left >= 0); if (left == 0) { streamAggChildrenRecoverFinish(pTask);