提交 fc225a58 编写于 作者: L Liu Jicong

enh(stream): recover process

上级 d9a4ea30
...@@ -154,7 +154,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { ...@@ -154,7 +154,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
...@@ -226,8 +229,6 @@ typedef struct { ...@@ -226,8 +229,6 @@ typedef struct {
int32_t nodeId; int32_t nodeId;
int32_t childId; int32_t childId;
int32_t taskId; int32_t taskId;
// int64_t checkpointVer;
// int64_t processedVer;
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
...@@ -372,15 +373,6 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc ...@@ -372,15 +373,6 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
return 0; return 0;
} }
typedef struct {
int32_t reserved;
} SStreamTaskDeployRsp;
typedef struct {
// SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t streamId; int64_t streamId;
...@@ -478,7 +470,18 @@ typedef struct { ...@@ -478,7 +470,18 @@ typedef struct {
} SStreamRecoverDownstreamRsp; } SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp); int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t waitingRspCnt;
int32_t totReq;
SArray* info; // SArray<SArray<SStreamCheckpointInfo>*>
} SStreamRecoverStatus;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
...@@ -504,7 +507,7 @@ typedef struct SStreamMeta { ...@@ -504,7 +507,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb; TTB* pTaskDb;
TTB* pStateDb; TTB* pStateDb;
SHashObj* pTasks; SHashObj* pTasks;
SHashObj* pRecoveringState; SHashObj* pRecoverStatus;
void* ahandle; void* ahandle;
TXN txn; TXN txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
......
...@@ -33,7 +33,7 @@ typedef struct { ...@@ -33,7 +33,7 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask); int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum); int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
......
...@@ -104,7 +104,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -104,7 +104,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status; int8_t status;
...@@ -136,7 +136,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -136,7 +136,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
pRsp->pCont = buf; pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
tFreeStreamDispatchReq(pReq);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
...@@ -183,6 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -183,6 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
pReq->upstreamTaskId); pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
tFreeStreamDispatchReq(pReq);
if (exec) { if (exec) {
streamTryExec(pTask); streamTryExec(pTask);
...@@ -246,25 +246,21 @@ int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, ...@@ -246,25 +246,21 @@ int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq,
return 0; return 0;
} }
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover // scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING; pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100) < 0) { if (streamPipelineExec(pTask, 100, true) < 0) {
return -1; return -1;
} }
} else { } else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL; pTask->taskStatus = TASK_STATUS__NORMAL;
} }
}
return 0; return 0;
} }
......
...@@ -93,7 +93,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock ...@@ -93,7 +93,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
} }
#endif #endif
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
...@@ -125,6 +125,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -125,6 +125,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
break; break;
} }
if (dispatch) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
...@@ -145,6 +146,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -145,6 +146,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
streamDispatch(pTask); streamDispatch(pTask);
} }
} }
}
return 0; return 0;
} }
......
...@@ -132,6 +132,49 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh ...@@ -132,6 +132,49 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return 0; return 0;
} }
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) {
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
return 0;
}
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) {
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pRsp->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i);
if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) {
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo));
if (pRsp->checkpointVer == NULL) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo info;
if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1;
taosArrayPush(pRsp->checkpointVer, &info);
}
return 0;
}
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL; void* buf = NULL;
...@@ -223,25 +266,129 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -223,25 +266,129 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) { int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
int32_t taskId = pVgInfo->taskId;
int32_t nodeId = pVgInfo->vgId;
SStreamRecoverDownstreamReq req = {
.streamId = pTask->taskId,
.downstreamTaskId = taskId,
.taskId = pTask->taskId,
};
int32_t tlen;
int32_t code;
tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code);
if (code < 0) {
return -1;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
tEncoderClear(&encoder);
((SMsgHead*)buf)->vgId = htonl(nodeId);
SRpcMsg msg = {
.pCont = buf, .contLen = sizeof(SMsgHead) + tlen,
/*.msgType = */
};
tmsgSendReq(&pVgInfo->epSet, &msg);
return 0;
}
int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
// set self status to recover_phase1 // set self status to recover_phase1
// build fetch status msg SStreamRecoverStatus* pRecover;
// send fetch msg
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM); atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t));
if (pRecover == NULL) {
pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus));
if (pRecover == NULL) {
return -1;
}
pRecover->info = taosArrayInit(0, sizeof(void*));
if (pRecover->info == NULL) {
taosMemoryFree(pRecover);
return -1;
}
taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*));
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
pRecover->totReq = 1;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
pRecover->totReq = numOfDownstream;
for (int32_t i = 0; i < numOfDownstream; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i);
streamFetchRecoverStatus(pTask, pVgInfo);
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
} }
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) { int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
// if failed, set timer and retry // if failed, set timer and retry
// if successful // if successful
// add rsp state to partial recover hash int32_t taskId = pTask->taskId;
// if complete, begin actual recover SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
if (pRecover == NULL) {
return -1;
}
taosArrayPush(pRecover->info, &pRsp->checkpointVer);
int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1);
ASSERT(leftRsp >= 0);
if (leftRsp == 0) {
ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq);
// srcNodeId -> SStreamCheckpointInfo*
SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pFinalChecks == NULL) return -1;
for (int32_t i = 0; i < pRecover->totReq; i++) {
SArray* pChecks = taosArrayGetP(pRecover->info, i);
int32_t sz = taosArrayGetSize(pChecks);
for (int32_t j = 0; j < sz; j++) {
SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j);
SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t));
if (pCheck == NULL) {
pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo));
pCheck->srcNodeId = pOneCheck->srcNodeId;
pCheck->srcChildId = pOneCheck->srcChildId;
pCheck->stateProcessedVer = pOneCheck->stateProcessedVer;
taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*));
} else {
pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer);
}
}
}
// load local state
//
// recover
//
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 10000, true) < 0) {
return -1;
}
}
taosHashCleanup(pFinalChecks);
taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
}
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册