streamRecover.c 33.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25 26 27 28
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void    launchFillHistoryTask(SStreamTask* pTask);
static void    streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
29

30
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
31 32 33
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;

34
  int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
35
  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
36
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
37 38
}

39
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
L
liuyao 已提交
40
  SStreamScanHistoryReq req;
41
  initScanHistoryReq(pTask, &req, igUntreated);
L
liuyao 已提交
42

43
  int32_t len = sizeof(SStreamScanHistoryReq);
L
liuyao 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

59 60 61
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
62
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
63
    case TASK_STATUS__HALT: return "halt";
64
    case TASK_STATUS__PAUSE: return "paused";
65
    case TASK_STATUS__DROPPING: return "dropping";
66 67 68
    default:return "";
  }
}
69

70 71
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
L
liuyao 已提交
72 73 74
  if (pTask->info.fillHistory) {
    streamSetParamForScanHistory(pTask);
  }
75

76 77
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
  int32_t code = streamStartScanHistoryAsync(pTask, 0);
L
liuyao 已提交
78
  return code;
79
}
80

81 82 83
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
84
      return doLaunchScanHistoryTask(pTask);
85 86 87 88 89 90
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
91
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
92 93 94
    if (pTask->info.fillHistory) {
      streamSetParamForScanHistory(pTask);
    }
95
    streamTaskEnablePause(pTask);
96
    streamTaskScanHistoryPrepare(pTask);
97
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
98
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
99
    streamTaskScanHistoryPrepare(pTask);
100 101 102 103
  }
  return 0;
}

104
// check status
105
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
106
  SHistDataRange* pRange = &pTask->dataRange;
107
  STimeWindow*    pWindow = &pRange->window;
108

109
  SStreamTaskCheckReq req = {
110 111
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
112 113
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
114
  };
115

116
  // serialize
117
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
118 119 120 121 122
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

123 124 125 126 127
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

128
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
129
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
130
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
131

132
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
133
    pTask->notReadyTasks = numOfVgs;
134
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
135

136 137 138
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

139
    for (int32_t i = 0; i < numOfVgs; i++) {
140 141 142 143 144
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
145 146
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
147
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
148 149
    }
  } else {
150 151
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

152
    streamTaskSetReady(pTask, 0);
153
    streamTaskSetRangeStreamCalc(pTask);
154
    streamTaskLaunchScanHistory(pTask);
155

156
    launchFillHistoryTask(pTask);
157
  }
158

159 160 161
  return 0;
}

162
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
163 164 165 166 167 168 169 170 171
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
172

173
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
174
         req.downstreamTaskId, req.downstreamNodeId);
175

176
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
177
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
178
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
179
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
180 181 182

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
183 184
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
185
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
186 187 188
      }
    }
  }
189

190 191 192
  return 0;
}

193
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
194 195 196 197
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
198
  streamTaskSetReady(pTask, numOfReqs);
199 200 201 202 203 204 205 206 207
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
208
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
209 210 211 212 213 214 215
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
216 217
}

218
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
219
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
220
  const char* id = pTask->id.idStr;
221

222
  if (pRsp->status == 1) {
223
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
224
      bool found = false;
225 226 227

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
228 229 230 231 232 233
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
234 235 236 237 238

      if (!found) {
        return -1;
      }

239
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
240
      ASSERT(left >= 0);
241

242 243
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
244
        pTask->checkReqIds = NULL;
245 246

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
247
      } else {
248 249 250
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
251
      }
252
    } else {
253
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
254 255 256 257
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

258
      doProcessDownstreamReadyRsp(pTask, 1);
259
    }
260 261 262
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
263
    taosMsleep(100);
264

265
    streamRecheckDownstream(pTask, pRsp);
266
  }
267

268 269 270
  return 0;
}

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

297
// common
298
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
299
  qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
300
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
301
}
302

303
int32_t streamRestoreParam(SStreamTask* pTask) {
304
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
305
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
306
}
307

308
int32_t streamSetStatusNormal(SStreamTask* pTask) {
309 310 311 312 313 314 315 316 317
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
318 319 320
}

// source
L
liuyao 已提交
321 322 323 324 325 326
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
327 328
}

329
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
330
  pReq->msgHead.vgId = pTask->info.nodeId;
331 332
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
333
  pReq->igUntreated = igUntreated;
334 335 336
  return 0;
}

337
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
338 339 340
  return streamScanExec(pTask, 100);
}

341
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
342 343 344 345 346 347
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
348

L
Liu Jicong 已提交
349
  // serialize
350
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
351 352
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
353
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
354
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
355
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
356
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
357
    pTask->notReadyTasks = numOfVgs;
358

359
    qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
360 361
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
362
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
363
      req.downstreamTaskId = pVgInfo->taskId;
364
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
365
    }
366
  } else {
367
    qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
368
    streamProcessScanHistoryFinishRsp(pTask);
369
  }
370

371 372 373
  return 0;
}

H
Haojun Liao 已提交
374 375 376 377 378 379
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
380
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
396
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409 410
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
411
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
412
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
413 414 415 416 417 418 419 420

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
421
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
422
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
423
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
424
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
425 426 427 428 429
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
430
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
431 432 433 434 435 436 437
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

438
// agg
439
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
440
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
441 442 443
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
444 445 446
  return 0;
}

447
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
448
  void* exec = pTask->exec.pExecutor;
L
liuyao 已提交
449
  if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
450 451
    return -1;
  }
452

453 454 455 456 457 458
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

459 460 461 462 463 464 465
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
466

467 468
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
469

470 471 472 473 474 475 476 477
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
478
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
479
    }
480

481
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
482

483
    // sink node does not receive the pause msg from mnode, so does not need enable it
H
Haojun Liao 已提交
484 485 486
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
487 488 489
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
490
  }
491

492 493 494
  return 0;
}

495 496 497 498 499 500 501 502 503 504
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
505
  streamMetaCommit(pMeta);
506 507
  taosWUnLockLatch(&pMeta->lock);

508
  // history data scan in the stream time window finished, now let's enable the pause
H
Haojun Liao 已提交
509 510
  streamTaskEnablePause(pTask);

511
  // for source tasks, let's continue execute.
512 513 514 515 516 517 518
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

519 520 521 522
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

523
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
524
    qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
525
           " ver range:%" PRId64 " - %" PRId64,
526 527 528 529 530
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
531 532

  // check if downstream tasks have been ready
533
  streamTaskDoCheckDownstreamTasks(pHTask);
534 535 536
}

static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
537 538 539
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

540
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
541 542 543 544 545

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
546

H
Haojun Liao 已提交
547
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
548 549 550
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
551
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
552 553 554 555 556 557
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
558

H
Haojun Liao 已提交
559 560 561
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
562

H
Haojun Liao 已提交
563 564
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
565 566 567
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
568 569
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
570
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
571

572
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
573
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
574 575 576
      return;
    }

577 578 579 580
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
581 582 583 584 585

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
586
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
587
  }
H
Haojun Liao 已提交
588 589

  taosMemoryFree(pInfo);
590 591 592 593
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
594
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
595
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
596
  int32_t      hTaskId = pTask->historyTaskId.taskId;
597 598

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
599
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
600 601
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
602
          pMeta->vgId, hTaskId);
603

H
Haojun Liao 已提交
604 605 606
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
607

608 609 610
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
611
        // todo failed to create timer
H
Haojun Liao 已提交
612
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
613 614
      } else {
        pTask->status.timerActive = 1;  // timer is active
615
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
616
      }
H
Haojun Liao 已提交
617 618
    } else {  // timer exists
      pTask->status.timerActive = 1;
619 620
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
621 622 623 624 625 626 627 628 629 630
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

631 632 633 634 635 636
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
L
liuyao 已提交
637 638 639 640 641 642
  int32_t code = 0;
  if (pTask->info.fillHistory) {
    code = streamRestoreParam(pTask);
    if (code < 0) {
      return -1;
    }
643 644
  }

645
  // dispatch scan-history finish req to all related downstream task
646
  code = streamDispatchScanHistoryFinishMsg(pTask);
647 648 649 650 651 652 653
  if (code < 0) {
    return -1;
  }

  return 0;
}

654
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
L
liuyao 已提交
655
  void* exec = pTask->exec.pExecutor;
656
  return qStreamInfoResetTimewindowFilter(exec);
L
liuyao 已提交
657 658
}

659
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
660 661 662 663 664 665
  SVersionRange* pRange = &pTask->dataRange.range;
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
666
    streamTaskFillHistoryFinished(pTask);
667
    qDebug(
668 669 670 671
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
        "related stream task currentVer:%" PRId64,
        pTask->id.idStr, latestVer);
    return true;
672 673 674 675 676
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
677
    return false;
678 679 680
  }
}

681
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
682
  if (tStartEncode(pEncoder) < 0) return -1;
683
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
684
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
685
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
686 687 688 689
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
690 691 692 693
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

694
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
695
  if (tStartDecode(pDecoder) < 0) return -1;
696
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
697
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
698
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
699 700 701 702
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
703 704 705 706
  tEndDecode(pDecoder);
  return 0;
}

707
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
708
  if (tStartEncode(pEncoder) < 0) return -1;
709
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
710
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
711 712 713 714 715 716
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
717 718 719 720
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

721
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
722
  if (tStartDecode(pDecoder) < 0) return -1;
723 724 725 726 727 728 729 730
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
731 732 733 734
  tEndDecode(pDecoder);
  return 0;
}

735
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
736 737
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
738 739 740
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
741
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
742 743 744
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
745

746
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
747 748
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
749 750 751
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
752
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
753 754 755
  tEndDecode(pDecoder);
  return 0;
}
756

757 758 759
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
760 761 762 763 764 765 766 767 768
    if (pTask->info.fillHistory == 1) {
      qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
                 "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
             "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }
769 770 771
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

772 773 774 775 776 777 778
    int64_t ekey = 0;
    if (pRange->window.ekey < INT64_MAX) {
      ekey = pRange->window.ekey + 1;
    } else {
      ekey = pRange->window.ekey;
    }

779 780 781 782 783 784 785
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

786
    qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
807 808
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
809 810 811 812
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
813

814 815
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
816
}
817

818
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
819 820 821 822
void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
823 824 825 826 827 828 829 830 831 832 833 834 835

  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING) {
    qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

  const char* str = streamGetTaskStatusStr(status);
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
    qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
    return;
  }

836
  while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
837 838
    status = pTask->status.taskStatus;
    if (status == TASK_STATUS__DROPPING) {
839 840 841 842
      qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
      return;
    }

843 844 845 846 847
    if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
      qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
      return;
    }

848 849
    const char* pStatus = streamGetTaskStatusStr(status);
    qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
850 851 852
    taosMsleep(100);
  }

853
  // todo: use the task lock, stead of meta lock
854 855 856 857 858 859 860 861 862
  taosWLockLatch(&pMeta->lock);

  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    taosWUnLockLatch(&pMeta->lock);
    qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

863 864
  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
865
  taosWUnLockLatch(&pMeta->lock);
866

867 868 869 870 871 872
  // in case of fill-history task, stop the tsdb file scan operation.
  if (pTask->info.fillHistory == 1) {
    void* pExecutor = pTask->exec.pExecutor;
    qKillTask(pExecutor, TSDB_CODE_SUCCESS);
  }

873 874
  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
875
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
876 877
}

878 879 880 881 882 883 884 885 886 887 888
void streamTaskResume(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__PAUSE) {
    pTask->status.taskStatus = pTask->status.keepTaskStatus;
    pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
    qDebug("s-task:%s resume from pause", pTask->id.idStr);
  } else {
    qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
  }
}

889 890 891
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
892
  const char* id = pTask->id.idStr;
893
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
894
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
895
    taosMsleep(100);
896 897
  }

898
  qDebug("s-task:%s disable task pause", id);
899 900 901 902 903 904
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
}

void streamTaskHalt(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    return;
  }

  if (status == TASK_STATUS__HALT) {
    return;
  }

  // upgrade to halt status
  if (status == TASK_STATUS__PAUSE) {
    qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
           streamGetTaskStatusStr(TASK_STATUS__PAUSE));
  } else {
    qDebug("s-task:%s halt task", pTask->id.idStr);
  }

  pTask->status.keepTaskStatus = status;
  pTask->status.taskStatus = TASK_STATUS__HALT;
}

void streamTaskResumeFromHalt(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  int8_t status = pTask->status.taskStatus;
  if (status != TASK_STATUS__HALT) {
    qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
    return;
  }

  pTask->status.taskStatus = pTask->status.keepTaskStatus;
  pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
  qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}