streamRecover.c 13.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"

18
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
19
  qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
20

21
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
22
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
23
    qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus);
H
Haojun Liao 已提交
24

25
    streamSetParamForRecover(pTask);
26
    streamSourceRecoverPrepareStep1(pTask, pTask->dataRange.range.maxVer, pTask->dataRange.window.ekey);
27 28 29 30 31 32 33 34 35 36 37 38

    SStreamRecoverStep1Req req;
    streamBuildSourceRecover1Req(pTask, &req);
    int32_t len = sizeof(SStreamRecoverStep1Req);

    void* serializedReq = rpcMallocCont(len);
    if (serializedReq == NULL) {
      return -1;
    }

    memcpy(serializedReq, &req, len);

39
    SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE };
40 41 42 43 44
    if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
      /*ASSERT(0);*/
    }

  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
45
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
46 47 48
    streamSetParamForRecover(pTask);
    streamAggRecoverPrepare(pTask);
  } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
49
    // sink nodes has no specified operation for fill history
50
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
51
  }
52

53 54 55 56
  return 0;
}

// checkstatus
57 58
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) {
  qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver);
59

60
  SStreamTaskCheckReq req = {
61 62
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
63 64 65
      .upstreamNodeId = pTask->nodeId,
      .childId = pTask->selfChildId,
  };
66

67 68
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
69

70 71 72 73 74
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

75
    qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
76
           req.downstreamNodeId);
77
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
78 79
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
80

81 82 83
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    pTask->recoverTryingDownstream = numOfVgs;
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
84

85
    for (int32_t i = 0; i < numOfVgs; i++) {
86 87 88 89 90
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
91
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
92
             req.downstreamTaskId, req.downstreamNodeId);
93
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
94 95
    }
  } else {
96 97
    qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
    streamTaskLaunchRecover(pTask);
98
  }
99

100 101 102 103 104 105 106 107 108 109 110 111 112
  return 0;
}

int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
113

114
  qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
115
         req.downstreamTaskId, req.downstreamNodeId);
116

117
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
118
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
119 120
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
121 122 123

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
124 125
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
126
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
127 128 129
      }
    }
  }
130

131 132 133
  return 0;
}

134 135
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
136 137
}

138
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t ver) {
139 140 141
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);

  qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
142
         pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
143

144 145 146
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
147 148 149

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
150 151 152 153 154 155
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
156 157 158 159 160

      if (!found) {
        return -1;
      }

161 162
      int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
      ASSERT(left >= 0);
163

164 165
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
166
        pTask->checkReqIds = NULL;
167

168
        qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
169
        streamTaskLaunchRecover(pTask);
170 171
      }
    } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
172 173 174 175
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

176
      streamTaskLaunchRecover(pTask);
177 178 179
    } else {
      ASSERT(0);
    }
180
  } else { // not ready, wait for 100ms and retry
181
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
182 183
        pRsp->downstreamTaskId, pRsp->downstreamNodeId);
    taosMsleep(100);
184

185 186
    streamRecheckOneDownstream(pTask, pRsp);
  }
187

188 189 190
  return 0;
}

191 192
// common
int32_t streamSetParamForRecover(SStreamTask* pTask) {
193
  void* exec = pTask->exec.pExecutor;
194 195 196
  return qStreamSetParamForRecover(exec);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
197
  void* exec = pTask->exec.pExecutor;
198 199
  return qStreamRestoreParam(exec);
}
200

201
int32_t streamSetStatusNormal(SStreamTask* pTask) {
202
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
203 204 205 206
  return 0;
}

// source
207
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey) {
208
  void* exec = pTask->exec.pExecutor;
209
  return qStreamSourceRecoverStep1(exec, ver, ekey);
210 211 212
}

int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
L
Liu Jicong 已提交
213
  pReq->msgHead.vgId = pTask->nodeId;
214 215
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
216 217 218 219 220 221 222 223
  return 0;
}

int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
  return streamScanExec(pTask, 100);
}

int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
L
Liu Jicong 已提交
224
  pReq->msgHead.vgId = pTask->nodeId;
225 226
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
227 228 229 230
  return 0;
}

int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
231
  void* exec = pTask->exec.pExecutor;
232
  const char* id = pTask->id.idStr;
H
Haojun Liao 已提交
233

234 235
  int64_t st = taosGetTimestampMs();
  qDebug("s-task:%s recover step2(blocking stage) started", id);
236 237
  if (qStreamSourceRecoverStep2(exec, ver) < 0) {
  }
H
Haojun Liao 已提交
238

239 240 241 242 243 244
  int32_t code = streamScanExec(pTask, 100);

  double el = (taosGetTimestampMs() - st) / 1000.0;
  qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id,  el);

  return code;
245 246 247
}

int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
H
Haojun Liao 已提交
248 249
  SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId };

L
Liu Jicong 已提交
250
  // serialize
251
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
H
Haojun Liao 已提交
252 253
    qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr,
           pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
H
Haojun Liao 已提交
254

L
Liu Jicong 已提交
255 256
    req.taskId = pTask->fixedEpDispatcher.taskId;
    streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
257
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
258 259 260 261 262 263 264
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < vgSz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
265 266 267 268 269 270
  }
  return 0;
}

// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
271
  pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
272
  qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream);
273 274 275 276
  return 0;
}

int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
277
  void* exec = pTask->exec.pExecutor;
278 279 280 281 282 283
  if (qStreamRestoreParam(exec) < 0) {
    return -1;
  }
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
284
  streamSetStatusNormal(pTask);
285 286 287 288
  return 0;
}

int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
L
Liu Jicong 已提交
289
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
290
    int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1);
291
    qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left);
L
Liu Jicong 已提交
292 293 294 295
    ASSERT(left >= 0);
    if (left == 0) {
      streamAggChildrenRecoverFinish(pTask);
    }
296 297 298 299
  }
  return 0;
}

300
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
301
  if (tStartEncode(pEncoder) < 0) return -1;
302
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
303
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
304
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
305 306 307 308
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
309 310 311 312
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

313
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
314
  if (tStartDecode(pDecoder) < 0) return -1;
315
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
316
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
317
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
318 319 320 321
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
322 323 324 325
  tEndDecode(pDecoder);
  return 0;
}

326
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
327
  if (tStartEncode(pEncoder) < 0) return -1;
328
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
329
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
330 331 332 333 334 335
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
336 337 338 339
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

340
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
341
  if (tStartDecode(pDecoder) < 0) return -1;
342 343 344 345 346 347 348 349
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
350 351 352 353
  tEndDecode(pDecoder);
  return 0;
}

354
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
355 356 357
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
358
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
359 360 361
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
362
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
363 364 365
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
366
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
367 368 369
  tEndDecode(pDecoder);
  return 0;
}