提交 a9d11b58 编写于 作者: L Liu Jicong

enh(stream): recover

上级 0c942e62
......@@ -55,7 +55,6 @@ enum {
TASK_INPUT_STATUS__NORMAL = 1,
TASK_INPUT_STATUS__BLOCKED,
TASK_INPUT_STATUS__RECOVER,
TASK_INPUT_STATUS__PROCESSING,
TASK_INPUT_STATUS__STOP,
TASK_INPUT_STATUS__FAILED,
};
......@@ -320,17 +319,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
break;
}
ASSERT(0);
}
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) {
......@@ -443,13 +431,14 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
} SStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t rspTaskId;
int32_t reqTaskId;
int8_t inputStatus;
} SStreamTaskRecoverRsp;
......
......@@ -179,7 +179,7 @@ static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverRsp(pTask, pRsp);
return 0;
......
......@@ -796,7 +796,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
......
......@@ -32,10 +32,10 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
......@@ -189,7 +189,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
#endif
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
......@@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// 3.2 dispatch / sink
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
return 0;
......@@ -233,26 +233,55 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
return 0;
}
// continue dispatch
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
return 0;
}
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
return 0;
}
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
//
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = pTask->inputStatus;
pCont->streamId = pTask->streamId;
pCont->reqTaskId = pTask->taskId;
pCont->rspTaskId = pReq->upstreamTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp);
tmsgSendRsp(pRsp);
return 0;
}
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
//
if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
streamProcessRunReq(pTask);
if (pTask->isDataScan) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100) < 0) {
return -1;
}
} else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL;
}
}
return 0;
}
......@@ -262,10 +291,10 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask, pTask->pMsgCb);
streamExec(pTask);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
return 0;
}
......
......@@ -438,7 +438,7 @@ FAIL:
return code;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
#if 1
int8_t old =
......
......@@ -141,7 +141,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pTask->pMsgCb);
streamDispatch(pTask);
}
}
......@@ -229,7 +229,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
int32_t streamExec(SStreamTask* pTask) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) return -1;
while (1) {
......
......@@ -19,8 +19,8 @@ int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecover
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
......@@ -29,8 +29,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
......@@ -38,7 +38,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
......@@ -47,7 +48,8 @@ int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecover
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
tEndDecode(pDecoder);
return 0;
......@@ -125,7 +127,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
if (streamPipelineExec(pTask, 10) < 0) {
if (streamPipelineExec(pTask, 100) < 0) {
// set fail
return -1;
}
......
......@@ -294,7 +294,7 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
if (pArray->size == 0) {
if (pArray->size == 0 || cnt == 0) {
return;
}
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册