提交 e0cb8aa5 编写于 作者: H Haojun Liao

fix(stream): don't the initial task status and do some internal refactor.

上级 eb0e1f84
...@@ -31,7 +31,6 @@ extern "C" { ...@@ -31,7 +31,6 @@ extern "C" {
#ifndef _STREAM_H_ #ifndef _STREAM_H_
#define _STREAM_H_ #define _STREAM_H_
typedef void (*_free_reader_fn_t)(void*);
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
enum { enum {
...@@ -574,7 +573,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamT ...@@ -574,7 +573,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamT
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
......
...@@ -659,17 +659,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -659,17 +659,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
}; };
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) { if (pTask) {
rsp.status = 1; rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0;
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId:0x%" PRIx64
") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
") %d at node %d, check req from task %d at node %d, rsp status %d",
taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
} }
if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SEncoder encoder; SEncoder encoder;
int32_t code; int32_t code;
int32_t len; int32_t len;
...@@ -687,13 +692,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -687,13 +692,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
tEncodeSStreamTaskCheckRsp(&encoder, &rsp); tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg rspMsg = { SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info };
.code = 0,
.pCont = buf,
.contLen = sizeof(SMsgHead) + len,
.info = pMsg->info,
};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
return 0; return 0;
} }
...@@ -709,8 +708,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 ...@@ -709,8 +708,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return -1;
} }
tDecoderClear(&decoder);
tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
...@@ -764,8 +763,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -764,8 +763,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskCheckDownstream(pTask, sversion); streamTaskCheckDownstream(pTask, sversion);
} }
tqDebug("vgId:%d s-task:%s is deployed from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
return 0; return 0;
} }
...@@ -1117,7 +1116,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1117,7 +1116,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
......
...@@ -183,7 +183,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* ...@@ -183,7 +183,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1; return -1;
} }
pTask->status.taskStatus = STREAM_STATUS__NORMAL;
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
return 0; return 0;
} }
...@@ -215,22 +214,6 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -215,22 +214,6 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
} }
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = NULL;
taosRLockLatch(&pMeta->lock);
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
if ((*p) != NULL && atomic_load_8(&((*p)->status.taskStatus)) != TASK_STATUS__DROPPING) {
pTask = *p;
atomic_add_fetch_32(&pTask->refCnt, 1);
}
}
taosRUnLockLatch(&pMeta->lock);
return pTask;
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
......
...@@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
} else if (pTask->taskLevel == TASK_LEVEL__SINK) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
} }
return 0; return 0;
} }
...@@ -125,6 +126,7 @@ int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* ...@@ -125,6 +126,7 @@ int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
if (pRsp->status == 1) { if (pRsp->status == 1) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false; bool found = false;
...@@ -135,7 +137,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -135,7 +137,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
break; break;
} }
} }
if (!found) return -1;
if (!found) {
return -1;
}
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
...@@ -144,7 +150,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -144,7 +150,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (pRsp->reqId != pTask->checkReqId) return -1; if (pRsp->reqId != pTask->checkReqId) {
return -1;
}
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -164,6 +173,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { ...@@ -164,6 +173,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
return qStreamRestoreParam(exec); return qStreamRestoreParam(exec);
} }
int32_t streamSetStatusNormal(SStreamTask* pTask) { int32_t streamSetStatusNormal(SStreamTask* pTask) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
return 0; return 0;
...@@ -224,8 +234,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { ...@@ -224,8 +234,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// agg // agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) { int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream);
return 0; return 0;
} }
...@@ -244,6 +254,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { ...@@ -244,6 +254,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
if (pTask->taskLevel == TASK_LEVEL__AGG) { if (pTask->taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1); int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1);
qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
streamAggChildrenRecoverFinish(pTask); streamAggChildrenRecoverFinish(pTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册