/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "streamInc.h" int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; tEndDecode(pDecoder); return 0; } int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1; tEndDecode(pDecoder); return 0; } int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI8(pEncoder, pReq->taskLevel) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->taskLevel) < 0) return -1; tEndDecode(pDecoder); return 0; } int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->taskLevel) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->taskLevel) < 0) return -1; tEndDecode(pDecoder); return 0; } static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { if (pTask->checkpointingId == 0) { pTask->checkpointingId = checkpointId; pTask->checkpointAlignCnt = taosArrayGetSize(pTask->childEpInfo); } tAssert(pTask->checkpointingId == checkpointId); return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { // commit tdb state streamStateCommit(pTask->pState); // commit non-tdb state // copy and save new state // report to mnode // send checkpoint req to downstream return 0; } static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { // ref wal // set status checkpointing // do checkpoint return 0; } int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { int32_t code; int64_t checkpointId = pReq->checkpointId; code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId); if (code < 0) { // rsp error return -1; } return 0; } int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) { int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; if (taosArrayGetSize(pTask->childEpInfo) > 0) { code = streamAlignCheckpoint(pTask, checkpointId, childId); if (code > 0) { return 0; } if (code < 0) { tAssert(0); return -1; } } code = streamDoCheckpoint(pMeta, pTask, checkpointId); if (code < 0) { // rsp error return -1; } // send rsp to all children return 0; } int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp) { // recover step2, scan from wal // unref wal // set status normal return 0; }