streamRecover.c 25.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

L
liuyao 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

40 41 42
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
43
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
44
    case TASK_STATUS__HALT: return "halt";
45
    case TASK_STATUS__PAUSE: return "paused";
46 47 48
    default:return "";
  }
}
49

50 51
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
H
Haojun Liao 已提交
52

53 54
  qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
         pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
55

56
  streamSetParamForScanHistoryData(pTask);
L
liuyao 已提交
57
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
58

L
liuyao 已提交
59 60
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
61
}
62

63 64 65 66 67 68 69 70 71 72
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
      return doLaunchScanHistoryTask(pTask);
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
73
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
74
    streamSetStatusNormal(pTask);
75
    streamSetParamForScanHistoryData(pTask);
76
    streamAggRecoverPrepare(pTask);
77
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
78
    streamSetStatusNormal(pTask);
79
    qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
80
  }
81

82 83 84
  return 0;
}

85
// check status
86
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
87
  SHistDataRange* pRange = &pTask->dataRange;
88
  STimeWindow*    pWindow = &pRange->window;
89

90
  SStreamTaskCheckReq req = {
91 92
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
93 94
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
95
  };
96

97 98 99 100 101 102 103
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

104 105 106 107 108
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

109
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
110 111
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
112

113
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
114
    pTask->notReadyTasks = numOfVgs;
115
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
116

117 118 119
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

120
    for (int32_t i = 0; i < numOfVgs; i++) {
121 122 123 124 125
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
126 127
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
128
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
129 130
    }
  } else {
131
    pTask->status.downstreamReady = 1;
132
    qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
133
           pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
134

135
    streamTaskLaunchScanHistory(pTask);
136
  }
137

138 139 140
  return 0;
}

141
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
142 143 144 145 146 147 148 149 150
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
151

152
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
153
         req.downstreamTaskId, req.downstreamNodeId);
154

155
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
156
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
157 158
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
159 160 161

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
162 163
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
164
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
165 166 167
      }
    }
  }
168

169 170 171
  return 0;
}

172 173
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
174 175
}

176
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
177
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
178
  const char* id = pTask->id.idStr;
179

180 181 182
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
183 184 185

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
186 187 188 189 190 191
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
192 193 194 195 196

      if (!found) {
        return -1;
      }

197
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
198
      ASSERT(left >= 0);
199

200 201
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
202
        pTask->checkReqIds = NULL;
203

204 205 206 207 208 209 210 211 212
        if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
          qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
          streamTaskLaunchScanHistory(pTask);
        } else {
          ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
          qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
        }
213
      } else {
214 215 216
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
217 218
      }
    } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
219 220 221 222
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

223
      // set the downstream tasks have been checked flag
224 225
      ASSERT(pTask->status.downstreamReady == 0);
      pTask->status.downstreamReady = 1;
226

227
      ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
228 229 230 231 232
      if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
        qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
               streamGetTaskStatusStr(pTask->status.taskStatus));
        streamTaskLaunchScanHistory(pTask);
      } else {
233
        qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
234 235
               streamGetTaskStatusStr(pTask->status.taskStatus));
      }
236 237 238
    } else {
      ASSERT(0);
    }
239 240 241
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
242
    taosMsleep(100);
243

244
    streamRecheckDownstream(pTask, pRsp);
245
  }
246

247 248 249
  return 0;
}

250
// common
251
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
252 253
  qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
254
}
255

256
int32_t streamRestoreParam(SStreamTask* pTask) {
257 258
  qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
259
}
260

261
int32_t streamSetStatusNormal(SStreamTask* pTask) {
262
  qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
263
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
264 265 266 267
  return 0;
}

// source
L
liuyao 已提交
268 269 270 271 272 273
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
274 275
}

L
liuyao 已提交
276
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
277
  pReq->msgHead.vgId = pTask->info.nodeId;
278 279
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
280
  pReq->igUntreated = igUntreated;
281 282 283
  return 0;
}

284
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
285 286 287
  return streamScanExec(pTask, 100);
}

288
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
289
  SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
H
Haojun Liao 已提交
290

L
Liu Jicong 已提交
291
  // serialize
292
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
293
    req.taskId = pTask->fixedEpDispatcher.taskId;
294
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
295
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
296
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
297 298 299 300 301
    int32_t numOfVgs = taosArrayGetSize(vgInfo);

    qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
302 303
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
304
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
305
    }
306
  }
307

308 309 310
  return 0;
}

H
Haojun Liao 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
  qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
358 359
    qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
           pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
H
Haojun Liao 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376

    req.taskId = pTask->fixedEpDispatcher.taskId;
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

377 378
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
379
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
380
  qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
381
         pTask->numOfWaitingUpstream);
382 383 384
  return 0;
}

385
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
386
  void* exec = pTask->exec.pExecutor;
387
  if (qRestoreStreamOperatorOption(exec) < 0) {
388 389
    return -1;
  }
390

391 392 393
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
394 395

//  streamSetStatusNormal(pTask);
396 397 398
  return 0;
}

399
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
400 401
  if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
    int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
L
Liu Jicong 已提交
402
    ASSERT(left >= 0);
403

L
Liu Jicong 已提交
404
    if (left == 0) {
405 406 407
      int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
      qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
      streamAggUpstreamScanHistoryFinish(pTask);
408
    } else {
409 410
      qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
             pTask->id.idStr, taskId, childId, left);
L
Liu Jicong 已提交
411
    }
412

413 414 415 416
  }
  return 0;
}

417 418 419 420
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

421 422
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
423
           " ver range:%" PRId64 " - %" PRId64,
424 425 426 427 428
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
429 430 431 432 433

  // check if downstream tasks have been ready
  streamTaskCheckDownstreamTasks(pHTask);
}

H
Haojun Liao 已提交
434 435 436 437 438
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

439
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
440 441 442
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

443
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
444 445 446 447 448

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
449

H
Haojun Liao 已提交
450
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
451 452 453
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
454 455 456 457 458 459
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
460

H
Haojun Liao 已提交
461 462 463
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
464

H
Haojun Liao 已提交
465 466
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
467 468 469 470 471 472
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been "
          "destroyed, or should stop exec",
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
473

474
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
475
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
476 477 478
      return;
    }

479 480 481 482
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
483 484 485 486 487

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
488
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
489
  }
H
Haojun Liao 已提交
490 491

  taosMemoryFree(pInfo);
492 493 494 495
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
496
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
497
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
498
  int32_t      hTaskId = pTask->historyTaskId.taskId;
499 500

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
501
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
502 503
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
504
          pMeta->vgId, hTaskId);
505

H
Haojun Liao 已提交
506 507 508
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
509

510 511 512
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
513
        // todo failed to create timer
H
Haojun Liao 已提交
514 515
      } else {
        pTask->status.timerActive = 1;  // timer is active
516
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
517
      }
H
Haojun Liao 已提交
518 519
    } else {  // timer exists
      pTask->status.timerActive = 1;
520 521
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
522 523 524 525 526 527 528 529 530 531
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

532 533 534 535 536 537 538 539 540 541 542 543 544
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
545
  code = streamDispatchScanHistoryFinishMsg(pTask);
546 547 548 549
  if (code < 0) {
    return -1;
  }

550
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
551

552 553
  // ready to process data from inputQ
  streamSetStatusNormal(pTask);
554
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
555

556
  // todo check rsp, commit data
557 558 559 560
  streamMetaSaveTask(pMeta, pTask);
  return 0;
}

L
liuyao 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

576
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
577
  if (tStartEncode(pEncoder) < 0) return -1;
578
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
579
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
580
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
581 582 583 584
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
585 586 587 588
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

589
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
590
  if (tStartDecode(pDecoder) < 0) return -1;
591
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
592
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
593
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
594 595 596 597
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
598 599 600 601
  tEndDecode(pDecoder);
  return 0;
}

602
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
603
  if (tStartEncode(pEncoder) < 0) return -1;
604
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
605
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
606 607 608 609 610 611
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
612 613 614 615
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

616
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
617
  if (tStartDecode(pDecoder) < 0) return -1;
618 619 620 621 622 623 624 625
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
626 627 628 629
  tEndDecode(pDecoder);
  return 0;
}

H
Haojun Liao 已提交
630
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
631 632 633
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
634
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
635 636 637
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
H
Haojun Liao 已提交
638
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
L
Liu Jicong 已提交
639 640 641
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
642
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
643 644 645
  tEndDecode(pDecoder);
  return 0;
}
646

H
Haojun Liao 已提交
647
// todo handle race condition, this task may be destroyed
648 649 650 651 652 653 654 655 656 657 658
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
  } else {
    // calculate the correct start time window, and start the handle the history data for the main task.
    if (pTask->historyTaskId.taskId != 0) {
      // check downstream tasks for associated scan-history-data tasks
      streamCheckHistoryTaskDownstrem(pTask);

      // launch current task
      SHistDataRange* pRange = &pTask->dataRange;
659
      int64_t ekey = pRange->window.ekey + 1;
660 661 662 663 664 665 666 667 668 669 670 671 672
      int64_t ver = pRange->range.minVer;

      pRange->window.skey = ekey;
      pRange->window.ekey = INT64_MAX;
      pRange->range.minVer = 0;
      pRange->range.maxVer = ver;

      qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
                 ", ver range:%" PRId64 " - %" PRId64,
             pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
             pRange->range.maxVer);
    } else {
      SHistDataRange* pRange = &pTask->dataRange;
673 674
      qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
             ", ver range:%" PRId64 " - %" PRId64,
675 676 677
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }

678
    ASSERT(pTask->status.downstreamReady == 0);
679 680 681 682 683

    // check downstream tasks for itself
    streamTaskCheckDownstreamTasks(pTask);
  }
}