streamRecover.c 7.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"

int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
22 23
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
32 33
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
34 35 36 37 38 39 40
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
L
Liu Jicong 已提交
41 42
  if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1;
L
Liu Jicong 已提交
43 44 45 46 47 48 49 50
  if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
51 52
  if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1;
L
Liu Jicong 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
90 91
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
  if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
L
Liu Jicong 已提交
92
  if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
L
Liu Jicong 已提交
93
  if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
L
Liu Jicong 已提交
94 95 96
  return 0;
}

L
Liu Jicong 已提交
97 98
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
  if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
L
Liu Jicong 已提交
99
  if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
L
Liu Jicong 已提交
100
  if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
L
Liu Jicong 已提交
101 102 103
  return 0;
}

L
Liu Jicong 已提交
104
int32_t tEncodeSStreamMultiVgCheckpointInfo(SEncoder* pEncoder, const SStreamMultiVgCheckpointInfo* pCheckpoint) {
L
Liu Jicong 已提交
105 106
  if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1;
  if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1;
L
Liu Jicong 已提交
107
  if (tEncodeI32(pEncoder, pCheckpoint->checkpointId) < 0) return -1;
L
Liu Jicong 已提交
108 109 110 111
  if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1;
  int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer);
  if (tEncodeI32(pEncoder, sz) < 0) return -1;
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
112 113
    SStreamCheckpointInfo* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i);
    if (tEncodeSStreamCheckpointInfo(pEncoder, pOneVgCkpoint) < 0) return -1;
L
Liu Jicong 已提交
114
  }
L
Liu Jicong 已提交
115 116
  return 0;
}
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118
int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCheckpointInfo* pCheckpoint) {
L
Liu Jicong 已提交
119 120
  if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1;
L
Liu Jicong 已提交
121
  if (tDecodeI32(pDecoder, &pCheckpoint->checkpointId) < 0) return -1;
L
Liu Jicong 已提交
122 123 124 125
  if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1;
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
126 127
    SStreamCheckpointInfo oneVgCheckpoint;
    if (tDecodeSStreamCheckpointInfo(pDecoder, &oneVgCheckpoint) < 0) return -1;
L
Liu Jicong 已提交
128
    taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint);
L
Liu Jicong 已提交
129
  }
L
Liu Jicong 已提交
130 131
  return 0;
}
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  void* buf = NULL;

  ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
  int32_t sz = taosArrayGetSize(pTask->checkpointInfo);

  SStreamMultiVgCheckpointInfo checkpoint;
  checkpoint.checkpointId = 0;
  checkpoint.checkTs = taosGetTimestampMs();
  checkpoint.streamId = pTask->streamId;
  checkpoint.taskId = pTask->taskId;
  checkpoint.checkpointVer = pTask->checkpointInfo;

  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSStreamMultiVgCheckpointInfo, &checkpoint, len, code);
  if (code < 0) {
    return -1;
  }

  buf = taosMemoryCalloc(1, len);
  if (buf == NULL) {
    return -1;
  }
  SEncoder encoder;
  tEncoderInit(&encoder, buf, len);
  tEncodeSStreamMultiVgCheckpointInfo(&encoder, &checkpoint);
  tEncoderClear(&encoder);

  SStreamCheckpointKey key = {
      .taskId = pTask->taskId,
      .checkpointId = checkpoint.checkpointId,
  };

  if (tdbTbUpsert(pMeta->pStateDb, &key, sizeof(SStreamCheckpointKey), buf, len, &pMeta->txn) < 0) {
    ASSERT(0);
    goto FAIL;
  }

  taosMemoryFree(buf);
  return 0;
FAIL:
  if (buf) taosMemoryFree(buf);
  return -1;
}

L
Liu Jicong 已提交
179 180 181 182 183 184 185
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
  // load status
  void*   pVal = NULL;
  int32_t vLen = 0;
  if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
    return -1;
L
Liu Jicong 已提交
186
  }
L
Liu Jicong 已提交
187 188
  SDecoder decoder;
  tDecoderInit(&decoder, pVal, vLen);
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
  SStreamMultiVgCheckpointInfo aggCheckpoint;
  tDecodeSStreamMultiVgCheckpointInfo(&decoder, &aggCheckpoint);
  tDecoderClear(&decoder);

  pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
  pTask->checkpointInfo = aggCheckpoint.checkpointVer;

  return 0;
}

int32_t streamCheckAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
  // save and copy state
  // save state info
  return 0;
}

int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
  // try recover sink level
  // after all sink level recovered, choose current state backend to recover
  return 0;
}

int32_t streamCheckSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
  // try recover agg level
  //
  return 0;
}

int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
222 223
  return 0;
}
224

L
Liu Jicong 已提交
225 226
int32_t streamRecoverTask(SStreamTask* pTask) {
  //
L
Liu Jicong 已提交
227 228
  return 0;
}