streamRecover.c 34.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25 26 27 28
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void    launchFillHistoryTask(SStreamTask* pTask);
static void    streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
29

30
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
31 32 33
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;

34
  int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
35
  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
36
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
37 38
}

39
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
L
liuyao 已提交
40
  SStreamScanHistoryReq req;
41
  initScanHistoryReq(pTask, &req, igUntreated);
L
liuyao 已提交
42

43
  int32_t len = sizeof(SStreamScanHistoryReq);
L
liuyao 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

59 60 61
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
62
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
63
    case TASK_STATUS__HALT: return "halt";
64
    case TASK_STATUS__PAUSE: return "paused";
65
    case TASK_STATUS__DROPPING: return "dropping";
66 67 68
    default:return "";
  }
}
69

70 71
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
L
liuyao 已提交
72 73 74
  if (pTask->info.fillHistory) {
    streamSetParamForScanHistory(pTask);
  }
75

76 77
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
  int32_t code = streamStartScanHistoryAsync(pTask, 0);
L
liuyao 已提交
78
  return code;
79
}
80

81 82 83
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
84
      return doLaunchScanHistoryTask(pTask);
85 86 87 88 89 90
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
91
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
92 93 94
    if (pTask->info.fillHistory) {
      streamSetParamForScanHistory(pTask);
    }
95
    streamTaskEnablePause(pTask);
96
    streamTaskScanHistoryPrepare(pTask);
97
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
98
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
99
    streamTaskScanHistoryPrepare(pTask);
100 101 102 103
  }
  return 0;
}

104
// check status
105
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
106
  SHistDataRange* pRange = &pTask->dataRange;
107
  STimeWindow*    pWindow = &pRange->window;
108

109
  SStreamTaskCheckReq req = {
110 111
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
112 113
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
114
  };
115

116
  // serialize
117
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
118 119 120 121 122
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

123 124 125 126 127
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

128
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
129
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
130
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
131

132
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
133
    pTask->notReadyTasks = numOfVgs;
134
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
135

136 137 138
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

139
    for (int32_t i = 0; i < numOfVgs; i++) {
140 141 142 143 144
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
145 146
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
147
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
148 149
    }
  } else {
150 151
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

152
    streamTaskSetReady(pTask, 0);
153
    streamTaskSetRangeStreamCalc(pTask);
154
    streamTaskLaunchScanHistory(pTask);
155

156
    launchFillHistoryTask(pTask);
157
  }
158

159 160 161
  return 0;
}

162
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
163 164 165 166 167 168 169 170 171
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
172

173
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
174
         req.downstreamTaskId, req.downstreamNodeId);
175

176
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
177
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
178
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
179
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
180 181 182

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
183 184
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
185
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
186 187 188
      }
    }
  }
189

190 191 192
  return 0;
}

193
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
194 195 196 197
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
198
  streamTaskSetReady(pTask, numOfReqs);
199 200 201 202 203 204 205 206 207
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
208
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
209 210 211 212 213 214 215
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
216 217
}

218
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
219
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
220
  const char* id = pTask->id.idStr;
221

222
  if (pRsp->status == 1) {
223
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
224
      bool found = false;
225 226 227

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
228 229 230 231 232 233
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
234 235 236 237 238

      if (!found) {
        return -1;
      }

239
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
240
      ASSERT(left >= 0);
241

242 243
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
244
        pTask->checkReqIds = NULL;
245 246

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
247
      } else {
248 249 250
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
251
      }
252
    } else {
253
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
254 255 256 257
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

258
      doProcessDownstreamReadyRsp(pTask, 1);
259
    }
260 261 262
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
263
    taosMsleep(100);
264

265
    streamRecheckDownstream(pTask, pRsp);
266
  }
267

268 269 270
  return 0;
}

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

297
// common
298
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
299
  qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
300
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
301
}
302

303
int32_t streamRestoreParam(SStreamTask* pTask) {
304
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
305
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
306
}
307

308
int32_t streamSetStatusNormal(SStreamTask* pTask) {
309 310 311 312 313 314 315 316 317
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
318 319 320
}

// source
L
liuyao 已提交
321 322 323 324 325 326
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
327 328
}

329
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
330
  pReq->msgHead.vgId = pTask->info.nodeId;
331 332
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
333
  pReq->igUntreated = igUntreated;
334 335 336
  return 0;
}

337
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
338 339 340
  return streamScanExec(pTask, 100);
}

341
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
342 343 344 345 346 347
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
348

L
Liu Jicong 已提交
349
  // serialize
350
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
351 352
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
353
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
354
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
355
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
356
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
357
    pTask->notReadyTasks = numOfVgs;
358

359
    qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
360 361
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
362
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
363
      req.downstreamTaskId = pVgInfo->taskId;
364
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
365
    }
366
  } else {
367
    qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
368
    streamProcessScanHistoryFinishRsp(pTask);
369
  }
370

371 372 373
  return 0;
}

H
Haojun Liao 已提交
374 375 376 377 378 379
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
380
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
396
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409 410
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
411
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
412
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
413 414 415 416 417 418 419 420

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
421
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
422
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
423
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
424
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
425 426 427 428 429
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
430
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
431 432 433 434 435 436 437
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

438
// agg
439
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
440
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
441 442 443
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
444 445 446
  return 0;
}

447
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
448
  void* exec = pTask->exec.pExecutor;
L
liuyao 已提交
449
  if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
450 451
    return -1;
  }
452

453 454 455 456 457 458
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

459 460 461 462 463 464 465
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
466

467 468
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
469

470 471 472 473 474 475 476 477
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
478
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
479
    }
480

481
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
482

483
    // sink node does not receive the pause msg from mnode, so does not need enable it
H
Haojun Liao 已提交
484 485 486
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
487 488 489
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
490
  }
491

492 493 494
  return 0;
}

495 496 497 498 499 500
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  qResetStreamInfoTimeWindow(exec);
  return 0;
}

501 502 503 504 505 506 507 508 509 510
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
511
  streamMetaCommit(pMeta);
512 513
  taosWUnLockLatch(&pMeta->lock);

514
  // history data scan in the stream time window finished, now let's enable the pause
H
Haojun Liao 已提交
515 516
  streamTaskEnablePause(pTask);

517
  // for source tasks, let's continue execute.
518 519 520 521 522 523 524
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

525 526 527 528
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

529
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
530
    qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
531
           " ver range:%" PRId64 " - %" PRId64,
532 533 534 535 536
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
537 538

  // check if downstream tasks have been ready
539
  streamTaskDoCheckDownstreamTasks(pHTask);
540 541 542
}

static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
543 544 545
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

546
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
547 548 549 550 551

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
552

H
Haojun Liao 已提交
553
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
554 555 556
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
557
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
558 559 560 561 562 563
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
564

H
Haojun Liao 已提交
565 566 567
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
568

H
Haojun Liao 已提交
569 570
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
571 572 573
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
574 575
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
576
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
577

578
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
579
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
580 581 582
      return;
    }

583 584 585 586
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
587 588 589 590 591

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
592
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
593
  }
H
Haojun Liao 已提交
594 595

  taosMemoryFree(pInfo);
596 597 598 599
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
600
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
601
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
602
  int32_t      hTaskId = pTask->historyTaskId.taskId;
603 604

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
605
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
606 607
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
608
          pMeta->vgId, hTaskId);
609

H
Haojun Liao 已提交
610 611 612
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
613

614 615 616
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
617
        // todo failed to create timer
H
Haojun Liao 已提交
618
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
619 620
      } else {
        pTask->status.timerActive = 1;  // timer is active
621
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
622
      }
H
Haojun Liao 已提交
623 624
    } else {  // timer exists
      pTask->status.timerActive = 1;
625 626
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
627 628 629 630 631 632 633 634 635 636
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

637 638 639 640 641 642
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
L
liuyao 已提交
643 644 645 646 647 648
  int32_t code = 0;
  if (pTask->info.fillHistory) {
    code = streamRestoreParam(pTask);
    if (code < 0) {
      return -1;
    }
649 650
  }

651
  // dispatch scan-history finish req to all related downstream task
652
  code = streamDispatchScanHistoryFinishMsg(pTask);
653 654 655 656 657 658 659
  if (code < 0) {
    return -1;
  }

  return 0;
}

660
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
L
liuyao 已提交
661
  void* exec = pTask->exec.pExecutor;
662
  return qStreamInfoResetTimewindowFilter(exec);
L
liuyao 已提交
663 664
}

665
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
666 667 668 669 670 671
  SVersionRange* pRange = &pTask->dataRange.range;
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
672
    streamTaskFillHistoryFinished(pTask);
673
    qDebug(
674 675 676 677
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
        "related stream task currentVer:%" PRId64,
        pTask->id.idStr, latestVer);
    return true;
678 679 680 681 682
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
683
    return false;
684 685 686
  }
}

687
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
688
  if (tStartEncode(pEncoder) < 0) return -1;
689
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
690
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
691
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
692 693 694 695
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
696 697 698 699
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

700
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
701
  if (tStartDecode(pDecoder) < 0) return -1;
702
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
703
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
704
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
705 706 707 708
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
709 710 711 712
  tEndDecode(pDecoder);
  return 0;
}

713
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
714
  if (tStartEncode(pEncoder) < 0) return -1;
715
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
716
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
717 718 719 720 721 722
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
723 724 725 726
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

727
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
728
  if (tStartDecode(pDecoder) < 0) return -1;
729 730 731 732 733 734 735 736
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
737 738 739 740
  tEndDecode(pDecoder);
  return 0;
}

741
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
742 743
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
744 745 746
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
747
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
748 749 750
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
751

752
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
753 754
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
755 756 757
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
758
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
759 760 761
  tEndDecode(pDecoder);
  return 0;
}
762

763 764 765
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
766 767 768 769 770 771 772 773 774
    if (pTask->info.fillHistory == 1) {
      qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
                 "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
             "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }
775 776 777
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

778 779 780 781 782 783 784
    int64_t ekey = 0;
    if (pRange->window.ekey < INT64_MAX) {
      ekey = pRange->window.ekey + 1;
    } else {
      ekey = pRange->window.ekey;
    }

785 786 787 788 789 790 791
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

792
    qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
813 814
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
815 816 817 818
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
819

820 821
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
822
}
823

824
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
825 826 827 828
void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
829 830 831 832 833 834 835 836 837 838 839 840 841

  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING) {
    qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

  const char* str = streamGetTaskStatusStr(status);
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
    qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
    return;
  }

842
  while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
843 844
    status = pTask->status.taskStatus;
    if (status == TASK_STATUS__DROPPING) {
845 846 847 848
      qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
      return;
    }

849 850 851 852 853
    if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
      qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
      return;
    }

854 855
    const char* pStatus = streamGetTaskStatusStr(status);
    qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
856 857 858
    taosMsleep(100);
  }

859
  // todo: use the task lock, stead of meta lock
860 861 862 863 864 865 866 867 868
  taosWLockLatch(&pMeta->lock);

  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    taosWUnLockLatch(&pMeta->lock);
    qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

869 870
  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
871
  taosWUnLockLatch(&pMeta->lock);
872

873 874 875 876 877 878
  // in case of fill-history task, stop the tsdb file scan operation.
  if (pTask->info.fillHistory == 1) {
    void* pExecutor = pTask->exec.pExecutor;
    qKillTask(pExecutor, TSDB_CODE_SUCCESS);
  }

879 880
  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
881
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
882 883
}

884 885 886 887 888 889 890 891 892 893 894
void streamTaskResume(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__PAUSE) {
    pTask->status.taskStatus = pTask->status.keepTaskStatus;
    pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
    qDebug("s-task:%s resume from pause", pTask->id.idStr);
  } else {
    qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
  }
}

895 896 897
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
898
  const char* id = pTask->id.idStr;
899
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
900
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
901
    taosMsleep(100);
902 903
  }

904
  qDebug("s-task:%s disable task pause", id);
905 906 907 908 909 910
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
}

void streamTaskHalt(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    return;
  }

  if (status == TASK_STATUS__HALT) {
    return;
  }

  // upgrade to halt status
  if (status == TASK_STATUS__PAUSE) {
    qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
           streamGetTaskStatusStr(TASK_STATUS__PAUSE));
  } else {
    qDebug("s-task:%s halt task", pTask->id.idStr);
  }

  pTask->status.keepTaskStatus = status;
  pTask->status.taskStatus = TASK_STATUS__HALT;
}

void streamTaskResumeFromHalt(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  int8_t status = pTask->status.taskStatus;
  if (status != TASK_STATUS__HALT) {
    qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
    return;
  }

  pTask->status.taskStatus = pTask->status.keepTaskStatus;
  pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
  qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}