streamRecover.c 4.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"

int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
22 23
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
32 33
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
34 35 36 37 38 39 40
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
L
Liu Jicong 已提交
41 42
  if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1;
L
Liu Jicong 已提交
43 44 45 46 47 48 49 50
  if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
51 52
  if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1;
L
Liu Jicong 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
91
#if 0
L
Liu Jicong 已提交
92 93 94 95 96
  if (pTask->taskStatus != TASK_STATUS__FAIL) {
    return 0;
  }

  if (pTask->isStreamDistributed) {
97
    if (pTask->taskType == TASK_TYPE__SOURCE) {
L
Liu Jicong 已提交
98
      pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
99
    } else if (pTask->taskType != TASK_TYPE__SINK) {
L
Liu Jicong 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
      pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
      bool    hasCheckpoint = false;
      int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
      for (int32_t i = 0; i < childSz; i++) {
        SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
        if (pEpInfo->checkpointVer == -1) {
          hasCheckpoint = true;
          break;
        }
      }
      if (hasCheckpoint) {
        // load from checkpoint
      } else {
        // recover child
      }
    }
  } else {
117
    if (pTask->taskType == TASK_TYPE__SOURCE) {
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130
      if (pTask->checkpointVer != -1) {
        // load from checkpoint
      } else {
        // reset stream query task info
        // TODO get snapshot ver
        pTask->recoverSnapVer = -1;
        qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
        pTask->taskStatus = TASK_STATUS__RECOVERING;
      }
    }
  }

  if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
L
Liu Jicong 已提交
131
    if (streamPipelineExec(pTask, 100) < 0) {
132 133 134
      // set fail
      return -1;
    }
L
Liu Jicong 已提交
135
  }
136

137
#endif
L
Liu Jicong 已提交
138 139
  return 0;
}