streamRecover.c 12.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"

18
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
19
  qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
20

21
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
22
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
23 24 25 26 27 28 29 30 31 32 33 34 35 36
    streamSetParamForRecover(pTask);
    streamSourceRecoverPrepareStep1(pTask, version);

    SStreamRecoverStep1Req req;
    streamBuildSourceRecover1Req(pTask, &req);
    int32_t len = sizeof(SStreamRecoverStep1Req);

    void* serializedReq = rpcMallocCont(len);
    if (serializedReq == NULL) {
      return -1;
    }

    memcpy(serializedReq, &req, len);

37
    SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE };
38 39 40 41 42
    if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
      /*ASSERT(0);*/
    }

  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
43
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
44 45 46
    streamSetParamForRecover(pTask);
    streamAggRecoverPrepare(pTask);
  } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
47
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
48
  }
49

50 51 52 53 54 55
  return 0;
}

// checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
  SStreamTaskCheckReq req = {
56 57
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
58 59 60
      .upstreamNodeId = pTask->nodeId,
      .childId = pTask->selfChildId,
  };
61

62 63 64 65 66 67 68
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

69
    qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId,
70 71 72 73 74 75 76 77 78 79 80 81 82 83
           req.downstreamNodeId);
    streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);
    pTask->recoverTryingDownstream = vgSz;
    pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t));

    for (int32_t i = 0; i < vgSz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
84
      qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId,
85 86 87 88
             req.downstreamTaskId, req.downstreamNodeId);
      streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  } else {
89
    qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId);
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    streamTaskLaunchRecover(pTask, version);
  }
  return 0;
}

int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
105 106

  qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
107
         req.downstreamTaskId, req.downstreamNodeId);
108

109 110 111 112 113 114 115 116 117 118 119 120
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < vgSz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
        streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
      }
    }
  }
121

122 123 124 125
  return 0;
}

int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) {
126
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL;
127 128 129 130 131
}

int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
  qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId,
         pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
132

133 134 135 136 137 138 139 140 141 142
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
      for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) {
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
143 144 145 146 147

      if (!found) {
        return -1;
      }

148 149 150 151
      int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
      ASSERT(left >= 0);
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
152
        pTask->checkReqIds = NULL;
153 154 155
        streamTaskLaunchRecover(pTask, version);
      }
    } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
156 157 158 159
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

160 161 162 163
      streamTaskLaunchRecover(pTask, version);
    } else {
      ASSERT(0);
    }
164
  } else { // not ready, it should wait for at least 100ms and then retry
165 166
    streamRecheckOneDownstream(pTask, pRsp);
  }
167

168 169 170
  return 0;
}

171 172
// common
int32_t streamSetParamForRecover(SStreamTask* pTask) {
173
  void* exec = pTask->exec.pExecutor;
174 175 176
  return qStreamSetParamForRecover(exec);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
177
  void* exec = pTask->exec.pExecutor;
178 179
  return qStreamRestoreParam(exec);
}
180

181
int32_t streamSetStatusNormal(SStreamTask* pTask) {
182
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
183 184 185 186 187
  return 0;
}

// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
188
  void* exec = pTask->exec.pExecutor;
189 190 191 192
  return qStreamSourceRecoverStep1(exec, ver);
}

int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
L
Liu Jicong 已提交
193
  pReq->msgHead.vgId = pTask->nodeId;
194 195
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
196 197 198 199 200 201 202 203 204
  return 0;
}

int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
  //
  return streamScanExec(pTask, 100);
}

int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
L
Liu Jicong 已提交
205
  pReq->msgHead.vgId = pTask->nodeId;
206 207
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
208 209 210 211
  return 0;
}

int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
212
  void* exec = pTask->exec.pExecutor;
213 214 215 216 217 218
  if (qStreamSourceRecoverStep2(exec, ver) < 0) {
  }
  return streamScanExec(pTask, 100);
}

int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
219
  SStreamRecoverFinishReq req = {
220
      .streamId = pTask->id.streamId,
221 222
      .childId = pTask->selfChildId,
  };
L
Liu Jicong 已提交
223
  // serialize
224
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
225 226
    req.taskId = pTask->fixedEpDispatcher.taskId;
    streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
227
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
228 229 230 231 232 233 234
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < vgSz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
235 236 237 238 239 240
  }
  return 0;
}

// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
241
  pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
242
  qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream);
243 244 245 246
  return 0;
}

int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
247
  void* exec = pTask->exec.pExecutor;
248 249 250 251 252 253
  if (qStreamRestoreParam(exec) < 0) {
    return -1;
  }
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
254
  streamSetStatusNormal(pTask);
255 256 257 258
  return 0;
}

int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
L
Liu Jicong 已提交
259
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
260
    int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1);
261
    qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left);
L
Liu Jicong 已提交
262 263 264 265
    ASSERT(left >= 0);
    if (left == 0) {
      streamAggChildrenRecoverFinish(pTask);
    }
266 267 268 269
  }
  return 0;
}

270
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
271
  if (tStartEncode(pEncoder) < 0) return -1;
272
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
273
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
274
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
275 276 277 278
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
279 280 281 282
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

283
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
284
  if (tStartDecode(pDecoder) < 0) return -1;
285
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
286
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
287
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
288 289 290 291
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
292 293 294 295
  tEndDecode(pDecoder);
  return 0;
}

296
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
297
  if (tStartEncode(pEncoder) < 0) return -1;
298
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
299
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
300 301 302 303 304 305
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
306 307 308 309
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

310
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
311
  if (tStartDecode(pDecoder) < 0) return -1;
312 313 314 315 316 317 318 319
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
320 321 322 323
  tEndDecode(pDecoder);
  return 0;
}

324
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
325 326 327
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
328
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
329 330 331
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
332
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
333 334 335
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
336
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
337 338 339
  tEndDecode(pDecoder);
  return 0;
}