streamRecover.c 21.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

L
liuyao 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

40 41 42 43
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
    case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
44
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
45
    case TASK_STATUS__HALT: return "halt";
46
    case TASK_STATUS__PAUSE: return "paused";
47 48 49
    default:return "";
  }
}
50

51 52
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
H
Haojun Liao 已提交
53

54 55
  qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
         pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
56
         pRange->minVer, pRange->maxVer);
57

58
  streamSetParamForScanHistoryData(pTask);
59
  streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
60

L
liuyao 已提交
61 62
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
63
}
64

65 66 67 68 69 70 71 72 73 74
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
      return doLaunchScanHistoryTask(pTask);
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
75
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
76
    streamSetStatusNormal(pTask);
77
    streamSetParamForScanHistoryData(pTask);
78
    streamAggRecoverPrepare(pTask);
79
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
80
    streamSetStatusNormal(pTask);
81
    qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
82
  }
83

84 85 86
  return 0;
}

87
// check status
88
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
89 90 91
  SHistDataRange* pRange = &pTask->dataRange;
  qDebug("s-task:%s check downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
         pTask->id.idStr, pRange->range.minVer, pRange->range.maxVer, pRange->window.skey, pRange->window.ekey);
92

93
  SStreamTaskCheckReq req = {
94 95
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
96 97
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
98
  };
99

100 101
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
102

103 104 105 106 107
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

108
    qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId,
109
           req.downstreamNodeId);
110
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
111 112
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
113

114
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
115
    pTask->notReadyTasks = numOfVgs;
116
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
117

118
    for (int32_t i = 0; i < numOfVgs; i++) {
119 120 121 122 123
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
124
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->info.nodeId,
125
             req.downstreamTaskId, req.downstreamNodeId);
126
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
127 128
    }
  } else {
129 130 131 132
    pTask->status.checkDownstream = 1;
    qDebug("s-task:%s (vgId:%d) set downstream task checked for task without downstream tasks, try to launch scan-history-data, status:%s",
           pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
    streamTaskLaunchScanHistory(pTask);
133
  }
134

135 136 137
  return 0;
}

138
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
139 140 141 142 143 144 145 146 147
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
148

149
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
150
         req.downstreamTaskId, req.downstreamNodeId);
151

152
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
153
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
154 155
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
156 157 158

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
159 160
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
161
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
162 163 164
      }
    }
  }
165

166 167 168
  return 0;
}

169 170
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
171 172
}

173
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
174
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
175
  const char* id = pTask->id.idStr;
176

177 178 179
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
180 181 182

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
183 184 185 186 187 188
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
189 190 191 192 193

      if (!found) {
        return -1;
      }

194
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
195
      ASSERT(left >= 0);
196

197 198
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
199
        pTask->checkReqIds = NULL;
200

201 202 203 204 205 206 207 208 209
        if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
          qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
          streamTaskLaunchScanHistory(pTask);
        } else {
          ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
          qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
        }
210
      } else {
211 212 213
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
214 215
      }
    } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
216 217 218 219
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

220 221
      ASSERT(pTask->status.checkDownstream == 0);
      pTask->status.checkDownstream = 1;
222

223
      ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
224

225 226 227 228 229 230 231 232 233
      if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
        qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
               streamGetTaskStatusStr(pTask->status.taskStatus));
        streamTaskLaunchScanHistory(pTask);
      } else {
        ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
        qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
               streamGetTaskStatusStr(pTask->status.taskStatus));
      }
234 235 236
    } else {
      ASSERT(0);
    }
237 238 239
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
240
    taosMsleep(100);
241

242
    streamRecheckDownstream(pTask, pRsp);
243
  }
244

245 246 247
  return 0;
}

248
// common
249
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
250 251
  qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
252
}
253

254
int32_t streamRestoreParam(SStreamTask* pTask) {
255 256
  qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
257
}
258

259
int32_t streamSetStatusNormal(SStreamTask* pTask) {
260
  qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
261
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
262 263 264 265
  return 0;
}

// source
266 267
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow);
268 269
}

L
liuyao 已提交
270
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
271
  pReq->msgHead.vgId = pTask->info.nodeId;
272 273
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
274
  pReq->igUntreated = igUntreated;
275 276 277
  return 0;
}

278
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
279 280 281 282
  return streamScanExec(pTask, 100);
}

int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
283
  void* exec = pTask->exec.pExecutor;
284
  const char* id = pTask->id.idStr;
H
Haojun Liao 已提交
285

286 287
  int64_t st = taosGetTimestampMs();
  qDebug("s-task:%s recover step2(blocking stage) started", id);
288 289
  if (qStreamSourceRecoverStep2(exec, ver) < 0) {
  }
H
Haojun Liao 已提交
290

291 292 293 294 295 296
  int32_t code = streamScanExec(pTask, 100);

  double el = (taosGetTimestampMs() - st) / 1000.0;
  qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id,  el);

  return code;
297 298
}

299
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
300
  SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
H
Haojun Liao 已提交
301

L
Liu Jicong 已提交
302
  // serialize
303
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
304 305
    qDebug("s-task:%s send scan-history-data complete msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
           pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
H
Haojun Liao 已提交
306

L
Liu Jicong 已提交
307
    req.taskId = pTask->fixedEpDispatcher.taskId;
308
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
309
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
310
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
311 312 313 314 315
    int32_t numOfVgs = taosArrayGetSize(vgInfo);

    qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
316 317
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
318
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
319
    }
320
  }
321

322 323 324
  return 0;
}

H
Haojun Liao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
  qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
372
    qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
H
Haojun Liao 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
           pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);

    req.taskId = pTask->fixedEpDispatcher.taskId;
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

391 392
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
393
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
394
  qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr,
395
         pTask->numOfWaitingUpstream);
396 397 398 399
  return 0;
}

int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
400
  void* exec = pTask->exec.pExecutor;
401
  if (qRestoreStreamOperatorOption(exec) < 0) {
402 403 404 405 406
    return -1;
  }
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
407
  streamSetStatusNormal(pTask);
408 409 410 411
  return 0;
}

int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
412 413
  if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
    int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
L
Liu Jicong 已提交
414
    ASSERT(left >= 0);
415

L
Liu Jicong 已提交
416
    if (left == 0) {
417
      qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, left);
L
Liu Jicong 已提交
418
      streamAggChildrenRecoverFinish(pTask);
419 420
    } else {
      qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left);
L
Liu Jicong 已提交
421
    }
422

423 424 425 426
  }
  return 0;
}

427 428 429 430
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

431 432
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
433
           " ver range:%" PRId64 " - %" PRId64,
434 435 436 437 438
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
439 440 441 442 443 444 445 446 447 448 449 450 451 452

  // check if downstream tasks have been ready
  streamTaskCheckDownstreamTasks(pHTask);
}

static void tryLaunchHistoryTask(void* param, void* tmrId) {
  SStreamTask* pTask = param;

  SStreamMeta* pMeta = pTask->pMeta;
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
          pMeta->vgId, pTask->historyTaskId.taskId);

453
    taosTmrReset(tryLaunchHistoryTask, 100, pTask, streamEnv.timer, &pTask->timer);
454 455 456 457 458 459 460 461
    return;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
462
int32_t streamTaskStartHistoryTask(SStreamTask* pTask) {
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
  SStreamMeta* pMeta = pTask->pMeta;
  if (pTask->historyTaskId.taskId == 0) {
    return TSDB_CODE_SUCCESS;
  }

  // Set the execute conditions, including the query time window and the version range
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
          pMeta->vgId, pTask->historyTaskId.taskId);

    if (pTask->timer == NULL) {
      pTask->timer = taosTmrStart(tryLaunchHistoryTask,  100, pTask, streamEnv.timer);
      if (pTask->timer == NULL) {
        // todo failed to create timer
      }
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

489 490 491 492 493 494 495 496 497 498 499 500 501
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
502
  code = streamDispatchScanHistoryFinishMsg(pTask);
503 504 505 506
  if (code < 0) {
    return -1;
  }

507 508
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  /*code = */streamSetStatusNormal(pTask);
509

510
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
511 512

  // todo check rsp
513 514 515 516
  streamMetaSaveTask(pMeta, pTask);
  return 0;
}

517
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
518
  if (tStartEncode(pEncoder) < 0) return -1;
519
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
520
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
521
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
522 523 524 525
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
526 527 528 529
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

530
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
531
  if (tStartDecode(pDecoder) < 0) return -1;
532
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
533
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
534
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
535 536 537 538
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
539 540 541 542
  tEndDecode(pDecoder);
  return 0;
}

543
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
544
  if (tStartEncode(pEncoder) < 0) return -1;
545
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
546
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
547 548 549 550 551 552
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
553 554 555 556
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

557
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
558
  if (tStartDecode(pDecoder) < 0) return -1;
559 560 561 562 563 564 565 566
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
567 568 569 570
  tEndDecode(pDecoder);
  return 0;
}

H
Haojun Liao 已提交
571
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
572 573 574
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
575
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
576 577 578
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
H
Haojun Liao 已提交
579
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
580 581 582
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
583
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
584 585 586
  tEndDecode(pDecoder);
  return 0;
}