diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8aacfec3970674449ae4c5ea252d31f8c20b97b8..9355d76dcbfd1a993e889edc70790e332f9b0435 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -154,7 +154,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } +static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { + // + return queue->qItem; +} static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); @@ -226,9 +229,7 @@ typedef struct { int32_t nodeId; int32_t childId; int32_t taskId; - // int64_t checkpointVer; - // int64_t processedVer; - SEpSet epSet; + SEpSet epSet; } SStreamChildEpInfo; typedef struct { @@ -372,15 +373,6 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc return 0; } -typedef struct { - int32_t reserved; -} SStreamTaskDeployRsp; - -typedef struct { - // SMsgHead head; - SStreamTask* task; -} SStreamTaskDeployReq; - typedef struct { SMsgHead head; int64_t streamId; @@ -478,7 +470,18 @@ typedef struct { } SStreamRecoverDownstreamRsp; int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); -int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp); +int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); + +int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp); +int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp); + +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t waitingRspCnt; + int32_t totReq; + SArray* info; // SArray*> +} SStreamRecoverStatus; int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); @@ -504,7 +507,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pStateDb; SHashObj* pTasks; - SHashObj* pRecoveringState; + SHashObj* pRecoverStatus; void* ahandle; TXN txn; FTaskExpand* expandFunc; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 1ff27f125339569a3bcabc4a0a1d9a7fe4872fe2..7dc8d822e9893b7518e38bc45ac170ef8e2cf19d 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,7 +33,7 @@ typedef struct { static SStreamGlobalEnv streamEnv; int32_t streamExec(SStreamTask* pTask); -int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum); +int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1f8d742de4a4c9bd8b728d6917964792ffcb38f3..6da7d4fd59028ac09a4493dfcd311769c619d52d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -104,7 +104,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); int8_t status; @@ -136,7 +136,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); - tFreeStreamDispatchReq(pReq); return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } @@ -183,6 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S pReq->upstreamTaskId); streamTaskEnqueue(pTask, pReq, pRsp); + tFreeStreamDispatchReq(pReq); if (exec) { streamTryExec(pTask); @@ -246,24 +246,20 @@ int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, return 0; } -int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { - if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) { - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - - streamProcessRunReq(pTask); +int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) { + streamProcessRunReq(pTask); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - // scan data to recover - pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; - pTask->taskStatus = TASK_STATUS__RECOVERING; - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - if (streamPipelineExec(pTask, 100) < 0) { - return -1; - } - } else { - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->taskStatus = TASK_STATUS__NORMAL; + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + // scan data to recover + pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; + pTask->taskStatus = TASK_STATUS__RECOVER_SELF; + qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); + if (streamPipelineExec(pTask, 100, true) < 0) { + return -1; } + } else { + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->taskStatus = TASK_STATUS__NORMAL; } return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ffb7c04bf2469d7a6b57e852c2dada75997257cd..72249c51817dd4c209fbf81b1f167d51e51c65f7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -93,7 +93,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock } #endif -int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { +int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); void* exec = pTask->exec.executor; @@ -125,24 +125,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { taosArrayDestroy(pRes); break; } - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return -1; - } + if (dispatch) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return -1; + } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - qRes->childId = pTask->selfChildId; + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + qRes->childId = pTask->selfChildId; - if (streamTaskOutput(pTask, qRes) < 0) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return -1; - } + if (streamTaskOutput(pTask, qRes) < 0) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return -1; + } - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamDispatch(pTask); + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamDispatch(pTask); + } } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index b2a7e00877e53bae2bf5b78e71d348cc223ff8b7..263053778b1ae94de5a5353edf158e37604baf98 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -132,6 +132,49 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh return 0; } +int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) { + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + return 0; +} + +int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) { + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + return 0; +} + +int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) { + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; + int32_t sz = taosArrayGetSize(pRsp->checkpointVer); + if (tEncodeI32(pEncoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i); + if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) { + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo)); + if (pRsp->checkpointVer == NULL) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamCheckpointInfo info; + if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1; + taosArrayPush(pRsp->checkpointVer, &info); + } + return 0; +} + int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; @@ -223,25 +266,129 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } -int32_t streamFetchDownstreamStatus(SStreamTask* pTask) { +int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) { + int32_t taskId = pVgInfo->taskId; + int32_t nodeId = pVgInfo->vgId; + SStreamRecoverDownstreamReq req = { + .streamId = pTask->taskId, + .downstreamTaskId = taskId, + .taskId = pTask->taskId, + }; + int32_t tlen; + int32_t code; + tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code); + if (code < 0) { + return -1; + } + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) { + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; + } + tEncoderClear(&encoder); + + ((SMsgHead*)buf)->vgId = htonl(nodeId); + SRpcMsg msg = { + .pCont = buf, .contLen = sizeof(SMsgHead) + tlen, + /*.msgType = */ + }; + tmsgSendReq(&pVgInfo->epSet, &msg); + + return 0; +} + +int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) { // set self status to recover_phase1 - // build fetch status msg - // send fetch msg + SStreamRecoverStatus* pRecover; atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM); + pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t)); + if (pRecover == NULL) { + pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus)); + if (pRecover == NULL) { + return -1; + } + pRecover->info = taosArrayInit(0, sizeof(void*)); + if (pRecover->info == NULL) { + taosMemoryFree(pRecover); + return -1; + } + taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*)); + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + pRecover->totReq = 1; } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + pRecover->totReq = numOfDownstream; + for (int32_t i = 0; i < numOfDownstream; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i); + streamFetchRecoverStatus(pTask, pVgInfo); + } } else { ASSERT(0); } return 0; } -int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) { +int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) { // if failed, set timer and retry // if successful - // add rsp state to partial recover hash - // if complete, begin actual recover + int32_t taskId = pTask->taskId; + SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t)); + if (pRecover == NULL) { + return -1; + } + + taosArrayPush(pRecover->info, &pRsp->checkpointVer); + + int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1); + ASSERT(leftRsp >= 0); + + if (leftRsp == 0) { + ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq); + + // srcNodeId -> SStreamCheckpointInfo* + SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pFinalChecks == NULL) return -1; + + for (int32_t i = 0; i < pRecover->totReq; i++) { + SArray* pChecks = taosArrayGetP(pRecover->info, i); + int32_t sz = taosArrayGetSize(pChecks); + for (int32_t j = 0; j < sz; j++) { + SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j); + SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t)); + if (pCheck == NULL) { + pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo)); + pCheck->srcNodeId = pOneCheck->srcNodeId; + pCheck->srcChildId = pOneCheck->srcChildId; + pCheck->stateProcessedVer = pOneCheck->stateProcessedVer; + taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*)); + } else { + pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer); + } + } + } + // load local state + // + // recover + // + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); + if (streamPipelineExec(pTask, 10000, true) < 0) { + return -1; + } + } + taosHashCleanup(pFinalChecks); + taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t)); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + } return 0; }