streamRecover.c 33.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);

static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;
H
Haojun Liao 已提交
26
  int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
27 28

  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
29
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
30 31
}

L
liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

52 53 54
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
55
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
56
    case TASK_STATUS__HALT: return "halt";
57
    case TASK_STATUS__PAUSE: return "paused";
58
    case TASK_STATUS__DROPPING: return "dropping";
59 60 61
    default:return "";
  }
}
62

63 64
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
L
liuyao 已提交
65 66 67
  if (pTask->info.fillHistory) {
    streamSetParamForScanHistory(pTask);
  }
L
liuyao 已提交
68
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
69

L
liuyao 已提交
70 71
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
72
}
73

74 75 76
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
77
      return doLaunchScanHistoryTask(pTask);
78 79 80 81 82 83
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
84
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
85 86 87
    if (pTask->info.fillHistory) {
      streamSetParamForScanHistory(pTask);
    }
88
    streamTaskEnablePause(pTask);
89
    streamTaskScanHistoryPrepare(pTask);
90
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
91
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
92
    streamTaskScanHistoryPrepare(pTask);
93 94 95 96
  }
  return 0;
}

97
// check status
98
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
99
  SHistDataRange* pRange = &pTask->dataRange;
100
  STimeWindow*    pWindow = &pRange->window;
101

102
  SStreamTaskCheckReq req = {
103 104
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
105 106
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
107
  };
108

109
  // serialize
110
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
111 112 113 114 115
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

116 117 118 119 120
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

121
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
122
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
123
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
124

125
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
126
    pTask->notReadyTasks = numOfVgs;
127
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
128

129 130 131
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

132
    for (int32_t i = 0; i < numOfVgs; i++) {
133 134 135 136 137
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
138 139
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
140
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
141 142
    }
  } else {
143 144 145 146
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

    streamTaskSetForReady(pTask, 0);
    streamTaskSetRangeStreamCalc(pTask);
147
    streamTaskLaunchScanHistory(pTask);
148

149
    launchFillHistoryTask(pTask);
150
  }
151

152 153 154
  return 0;
}

155
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
156 157 158 159 160 161 162 163 164
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
165

166
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
167
         req.downstreamTaskId, req.downstreamNodeId);
168

169
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
170
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
171
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
172
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
173 174 175

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
176 177
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
178
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
179 180 181
      }
    }
  }
182

183 184 185
  return 0;
}

186
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
187 188 189 190 191 192 193 194 195 196 197 198 199 200
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
  streamTaskSetForReady(pTask, numOfReqs);
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
201
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
202 203 204 205 206 207 208
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
209 210
}

211
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
212
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
213
  const char* id = pTask->id.idStr;
214

215
  if (pRsp->status == 1) {
216
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
217
      bool found = false;
218 219 220

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
221 222 223 224 225 226
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
227 228 229 230 231

      if (!found) {
        return -1;
      }

232
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
233
      ASSERT(left >= 0);
234

235 236
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
237
        pTask->checkReqIds = NULL;
238 239

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
240
      } else {
241 242 243
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
244
      }
245
    } else {
246
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
247 248 249 250
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

251
      doProcessDownstreamReadyRsp(pTask, 1);
252
    }
253 254 255
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
256
    taosMsleep(100);
257

258
    streamRecheckDownstream(pTask, pRsp);
259
  }
260

261 262 263
  return 0;
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

290
// common
291
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
292
  qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
293
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
294
}
295

296
int32_t streamRestoreParam(SStreamTask* pTask) {
297
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
298
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
299
}
300

301
int32_t streamSetStatusNormal(SStreamTask* pTask) {
302 303 304 305 306 307 308 309 310
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
311 312 313
}

// source
L
liuyao 已提交
314 315 316 317 318 319
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
320 321
}

L
liuyao 已提交
322
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
323
  pReq->msgHead.vgId = pTask->info.nodeId;
324 325
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
326
  pReq->igUntreated = igUntreated;
327 328 329
  return 0;
}

330
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
331 332 333
  return streamScanExec(pTask, 100);
}

334
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
335 336 337 338 339 340
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
341

L
Liu Jicong 已提交
342
  // serialize
343
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
344 345
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
346
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
347
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
348
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
349
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
350
    pTask->notReadyTasks = numOfVgs;
351

352
    qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
353 354
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
355
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
356
      req.downstreamTaskId = pVgInfo->taskId;
357
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
358
    }
359
  } else {
360
    qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
361
    streamProcessScanHistoryFinishRsp(pTask);
362
  }
363

364 365 366
  return 0;
}

H
Haojun Liao 已提交
367 368 369 370 371 372
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
373
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
389
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
404
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
405
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
406 407 408 409 410 411 412 413

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
414
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
415
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
416
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
417
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
418 419 420 421 422
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
423
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
424 425 426 427 428 429 430
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

431
// agg
432
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
433
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
434 435 436
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
437 438 439
  return 0;
}

440
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
441
  void* exec = pTask->exec.pExecutor;
L
liuyao 已提交
442
  if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
443 444
    return -1;
  }
445

446 447 448 449 450 451
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

452 453 454 455 456 457 458
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
459

460 461
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
462

463 464 465 466 467 468 469 470
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
471
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
472
    }
473

474
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
475

476
    // sink node does not receive the pause msg from mnode, so does not need enable it
H
Haojun Liao 已提交
477 478 479
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
480 481 482
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
483
  }
484

485 486 487
  return 0;
}

488 489 490 491 492 493 494 495 496 497
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
498
  streamMetaCommit(pMeta);
499 500
  taosWUnLockLatch(&pMeta->lock);

501
  // history data scan in the stream time window finished, now let's enable the pause
H
Haojun Liao 已提交
502 503
  streamTaskEnablePause(pTask);

504
  // for source tasks, let's continue execute.
505 506 507 508 509 510 511
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

512 513 514 515
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

516
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
517
    qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
518
           " ver range:%" PRId64 " - %" PRId64,
519 520 521 522 523
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
524 525

  // check if downstream tasks have been ready
526
  streamTaskDoCheckDownstreamTasks(pHTask);
527 528
}

H
Haojun Liao 已提交
529 530 531 532 533
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

534
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
535 536 537
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

538
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
539 540 541 542 543

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
544

H
Haojun Liao 已提交
545
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
546 547 548
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
549
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
550 551 552 553 554 555
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
556

H
Haojun Liao 已提交
557 558 559
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
560

H
Haojun Liao 已提交
561 562
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
563 564 565
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
566 567
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
568
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
569

570
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
571
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
572 573 574
      return;
    }

575 576 577 578
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
579 580 581 582 583

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
584
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
585
  }
H
Haojun Liao 已提交
586 587

  taosMemoryFree(pInfo);
588 589 590 591
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
592
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
593
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
594
  int32_t      hTaskId = pTask->historyTaskId.taskId;
595 596

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
597
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
598 599
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
600
          pMeta->vgId, hTaskId);
601

H
Haojun Liao 已提交
602 603 604
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
605

606 607 608
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
609
        // todo failed to create timer
H
Haojun Liao 已提交
610
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
611 612
      } else {
        pTask->status.timerActive = 1;  // timer is active
613
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
614
      }
H
Haojun Liao 已提交
615 616
    } else {  // timer exists
      pTask->status.timerActive = 1;
617 618
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
619 620 621 622 623 624 625 626 627 628
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

629 630 631 632 633 634
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
L
liuyao 已提交
635 636 637 638 639 640
  int32_t code = 0;
  if (pTask->info.fillHistory) {
    code = streamRestoreParam(pTask);
    if (code < 0) {
      return -1;
    }
641 642 643
  }

  // dispatch recover finish req to all related downstream task
644
  code = streamDispatchScanHistoryFinishMsg(pTask);
645 646 647 648 649 650 651
  if (code < 0) {
    return -1;
  }

  return 0;
}

L
liuyao 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

667
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
668 669 670 671 672 673 674 675
  SVersionRange* pRange = &pTask->dataRange.range;
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
676 677 678 679
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
        "related stream task currentVer:%" PRId64,
        pTask->id.idStr, latestVer);
    return true;
680 681 682 683 684
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
685
    return false;
686 687 688 689
  }
}


690
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
691
  if (tStartEncode(pEncoder) < 0) return -1;
692
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
693
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
694
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
695 696 697 698
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
699 700 701 702
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

703
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
704
  if (tStartDecode(pDecoder) < 0) return -1;
705
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
706
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
707
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
708 709 710 711
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
712 713 714 715
  tEndDecode(pDecoder);
  return 0;
}

716
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
717
  if (tStartEncode(pEncoder) < 0) return -1;
718
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
719
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
720 721 722 723 724 725
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
726 727 728 729
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

730
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
731
  if (tStartDecode(pDecoder) < 0) return -1;
732 733 734 735 736 737 738 739
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
740 741 742 743
  tEndDecode(pDecoder);
  return 0;
}

744
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
745 746
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
747 748 749
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
750
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
751 752 753
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
754

755
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
756 757
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
758 759 760
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
761
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
762 763 764
  tEndDecode(pDecoder);
  return 0;
}
765

766 767 768
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
769 770 771 772 773 774 775 776 777
    if (pTask->info.fillHistory == 1) {
      qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
                 "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
             "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }
778 779 780
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

781 782 783 784 785 786 787
    int64_t ekey = 0;
    if (pRange->window.ekey < INT64_MAX) {
      ekey = pRange->window.ekey + 1;
    } else {
      ekey = pRange->window.ekey;
    }

788 789 790 791 792 793 794
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

795
    qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
816 817
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
818 819 820 821
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
822

823 824
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
825
}
826

827
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
828 829 830 831
void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
832 833 834 835 836 837 838 839 840 841 842 843 844

  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING) {
    qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

  const char* str = streamGetTaskStatusStr(status);
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
    qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
    return;
  }

845
  while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
846 847
    status = pTask->status.taskStatus;
    if (status == TASK_STATUS__DROPPING) {
848 849 850 851
      qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
      return;
    }

852 853 854 855 856
    if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
      qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
      return;
    }

857 858
    const char* pStatus = streamGetTaskStatusStr(status);
    qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
859 860 861
    taosMsleep(100);
  }

862 863 864 865 866 867 868 869 870 871
  // todo: use the lock of the task.
  taosWLockLatch(&pMeta->lock);

  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    taosWUnLockLatch(&pMeta->lock);
    qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

872 873
  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
874
  taosWUnLockLatch(&pMeta->lock);
875 876 877

  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
878
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
879 880
}

881 882 883 884 885 886 887 888 889 890 891
void streamTaskResume(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__PAUSE) {
    pTask->status.taskStatus = pTask->status.keepTaskStatus;
    pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
    qDebug("s-task:%s resume from pause", pTask->id.idStr);
  } else {
    qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
  }
}

892 893 894
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
895
  const char* id = pTask->id.idStr;
896
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
897
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
898
    taosMsleep(100);
899 900
  }

901
  qDebug("s-task:%s disable task pause", id);
902 903 904 905 906 907
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
}

void streamTaskHalt(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    return;
  }

  if (status == TASK_STATUS__HALT) {
    return;
  }

  // upgrade to halt status
  if (status == TASK_STATUS__PAUSE) {
    qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
           streamGetTaskStatusStr(TASK_STATUS__PAUSE));
  } else {
    qDebug("s-task:%s halt task", pTask->id.idStr);
  }

  pTask->status.keepTaskStatus = status;
  pTask->status.taskStatus = TASK_STATUS__HALT;
}

void streamTaskResumeFromHalt(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  int8_t status = pTask->status.taskStatus;
  if (status != TASK_STATUS__HALT) {
    qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
    return;
  }

  pTask->status.taskStatus = pTask->status.keepTaskStatus;
  pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
  qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}