streamRecover.c 33.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);

static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;
H
Haojun Liao 已提交
26
  int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
27 28

  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
29
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
30 31
}

L
liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

52 53 54
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
55
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
56
    case TASK_STATUS__HALT: return "halt";
57
    case TASK_STATUS__PAUSE: return "paused";
58
    case TASK_STATUS__DROPPING: return "dropping";
59 60 61
    default:return "";
  }
}
62

63 64
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
L
liuyao 已提交
65 66 67
  if (pTask->info.fillHistory) {
    streamSetParamForScanHistory(pTask);
  }
L
liuyao 已提交
68
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
69

L
liuyao 已提交
70 71
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
72
}
73

74 75 76
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
77
      return doLaunchScanHistoryTask(pTask);
78 79 80 81 82 83
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
84
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
85 86 87
    if (pTask->info.fillHistory) {
      streamSetParamForScanHistory(pTask);
    }
88
    streamTaskEnablePause(pTask);
89
    streamTaskScanHistoryPrepare(pTask);
90
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
91
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
92
    streamTaskScanHistoryPrepare(pTask);
93 94 95 96
  }
  return 0;
}

97
// check status
98
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
99
  SHistDataRange* pRange = &pTask->dataRange;
100
  STimeWindow*    pWindow = &pRange->window;
101

102
  SStreamTaskCheckReq req = {
103 104
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
105 106
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
107
  };
108

109
  // serialize
110
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
111 112 113 114 115
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

116 117 118 119 120
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

121
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
122
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
123
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
124

125
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
126
    pTask->notReadyTasks = numOfVgs;
127
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
128

129 130 131
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

132
    for (int32_t i = 0; i < numOfVgs; i++) {
133 134 135 136 137
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
138 139
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
140
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
141 142
    }
  } else {
143 144 145 146
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

    streamTaskSetForReady(pTask, 0);
    streamTaskSetRangeStreamCalc(pTask);
147
    streamTaskLaunchScanHistory(pTask);
148

149
    launchFillHistoryTask(pTask);
150
  }
151

152 153 154
  return 0;
}

155
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
156 157 158 159 160 161 162 163 164
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
165

166
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
167
         req.downstreamTaskId, req.downstreamNodeId);
168

169
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
170
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
171
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
172
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
173 174 175

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
176 177
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
178
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
179 180 181
      }
    }
  }
182

183 184 185
  return 0;
}

186
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
187 188 189 190 191 192 193 194 195 196 197 198 199 200
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
  streamTaskSetForReady(pTask, numOfReqs);
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
201
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
202 203 204 205 206 207 208
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
209 210
}

211
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
212
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
213
  const char* id = pTask->id.idStr;
214

215
  if (pRsp->status == 1) {
216
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
217
      bool found = false;
218 219 220

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
221 222 223 224 225 226
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
227 228 229 230 231

      if (!found) {
        return -1;
      }

232
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
233
      ASSERT(left >= 0);
234

235 236
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
237
        pTask->checkReqIds = NULL;
238 239

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
240
      } else {
241 242 243
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
244
      }
245
    } else {
246
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
247 248 249 250
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

251
      doProcessDownstreamReadyRsp(pTask, 1);
252
    }
253 254 255
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
256
    taosMsleep(100);
257

258
    streamRecheckDownstream(pTask, pRsp);
259
  }
260

261 262 263
  return 0;
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

290
// common
291
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
292
  qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
293
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
294
}
295

296
int32_t streamRestoreParam(SStreamTask* pTask) {
297
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
298
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
299
}
300

301
int32_t streamSetStatusNormal(SStreamTask* pTask) {
302 303 304 305 306 307 308 309 310
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
311 312 313
}

// source
L
liuyao 已提交
314 315 316 317 318 319
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
320 321
}

L
liuyao 已提交
322
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
323
  pReq->msgHead.vgId = pTask->info.nodeId;
324 325
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
326
  pReq->igUntreated = igUntreated;
327 328 329
  return 0;
}

330
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
331 332 333
  return streamScanExec(pTask, 100);
}

334
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
335 336 337 338 339 340
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
341

L
Liu Jicong 已提交
342
  // serialize
343
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
344 345
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
346
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
347
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
348
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
349
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
350
    pTask->notReadyTasks = numOfVgs;
351

352
    qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
353 354
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
355
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
356
      req.downstreamTaskId = pVgInfo->taskId;
357
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
358
    }
359
  } else {
360
    qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
361
    streamProcessScanHistoryFinishRsp(pTask);
362
  }
363

364 365 366
  return 0;
}

H
Haojun Liao 已提交
367 368 369 370 371 372
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
373
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
389
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
404
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
405
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
406 407 408 409 410 411 412 413

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
414
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
415
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
416
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
417
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
418 419 420 421 422
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
423
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
424 425 426 427 428 429 430
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

431
// agg
432
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
433
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
434 435 436
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
437 438 439
  return 0;
}

440
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
441
  void* exec = pTask->exec.pExecutor;
L
liuyao 已提交
442
  if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
443 444
    return -1;
  }
445

446 447 448 449 450 451
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

452 453 454 455 456 457 458
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
459

460 461
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
462

463 464 465 466 467 468 469 470
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
471
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
472
    }
473

474
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
475

476
    // sink node does not receive the pause msg from mnode, so does not need enable it
H
Haojun Liao 已提交
477 478 479
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
480 481 482
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
483
  }
484

485 486 487
  return 0;
}

488 489 490 491 492 493 494 495 496 497 498 499
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
  taosWUnLockLatch(&pMeta->lock);

500
  // history data scan in the stream time window finished, now let's enable the pause
H
Haojun Liao 已提交
501 502
  streamTaskEnablePause(pTask);

503 504 505 506 507 508 509
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

510 511 512 513
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

514
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
515
    qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
516
           " ver range:%" PRId64 " - %" PRId64,
517 518 519 520 521
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
522 523

  // check if downstream tasks have been ready
524
  streamTaskDoCheckDownstreamTasks(pHTask);
525 526
}

H
Haojun Liao 已提交
527 528 529 530 531
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

532
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
533 534 535
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

536
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
537 538 539 540 541

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
542

H
Haojun Liao 已提交
543
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
544 545 546
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
547
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
548 549 550 551 552 553
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
554

H
Haojun Liao 已提交
555 556 557
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
558

H
Haojun Liao 已提交
559 560
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
561 562 563
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
564 565
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
566
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
567

568
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
569
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
570 571 572
      return;
    }

573 574 575 576
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
577 578 579 580 581

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
582
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
583
  }
H
Haojun Liao 已提交
584 585

  taosMemoryFree(pInfo);
586 587 588 589
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
590
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
591
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
592
  int32_t      hTaskId = pTask->historyTaskId.taskId;
593 594

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
595
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
596 597
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
598
          pMeta->vgId, hTaskId);
599

H
Haojun Liao 已提交
600 601 602
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
603

604 605 606
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
607
        // todo failed to create timer
H
Haojun Liao 已提交
608
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
609 610
      } else {
        pTask->status.timerActive = 1;  // timer is active
611
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
612
      }
H
Haojun Liao 已提交
613 614
    } else {  // timer exists
      pTask->status.timerActive = 1;
615 616
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
617 618 619 620 621 622 623 624 625 626
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

627 628 629 630 631 632
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
L
liuyao 已提交
633 634 635 636 637 638
  int32_t code = 0;
  if (pTask->info.fillHistory) {
    code = streamRestoreParam(pTask);
    if (code < 0) {
      return -1;
    }
639 640 641
  }

  // dispatch recover finish req to all related downstream task
642
  code = streamDispatchScanHistoryFinishMsg(pTask);
643 644 645 646 647 648 649
  if (code < 0) {
    return -1;
  }

  return 0;
}

L
liuyao 已提交
650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

665
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
666 667 668 669 670 671 672 673
  SVersionRange* pRange = &pTask->dataRange.range;
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
674 675 676 677
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
        "related stream task currentVer:%" PRId64,
        pTask->id.idStr, latestVer);
    return true;
678 679 680 681 682
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
683
    return false;
684 685 686 687
  }
}


688
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
689
  if (tStartEncode(pEncoder) < 0) return -1;
690
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
691
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
692
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
693 694 695 696
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
697 698 699 700
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

701
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
702
  if (tStartDecode(pDecoder) < 0) return -1;
703
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
704
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
705
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
706 707 708 709
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
710 711 712 713
  tEndDecode(pDecoder);
  return 0;
}

714
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
715
  if (tStartEncode(pEncoder) < 0) return -1;
716
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
717
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
718 719 720 721 722 723
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
724 725 726 727
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

728
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
729
  if (tStartDecode(pDecoder) < 0) return -1;
730 731 732 733 734 735 736 737
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
738 739 740 741
  tEndDecode(pDecoder);
  return 0;
}

742
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
743 744
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
745 746 747
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
748
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
749 750 751
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
752

753
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
754 755
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
756 757 758
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
759
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
760 761 762
  tEndDecode(pDecoder);
  return 0;
}
763

764 765 766
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
767 768 769 770 771 772 773 774 775
    if (pTask->info.fillHistory == 1) {
      qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
                 "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
             "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }
776 777 778
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

779 780 781 782 783 784 785
    int64_t ekey = 0;
    if (pRange->window.ekey < INT64_MAX) {
      ekey = pRange->window.ekey + 1;
    } else {
      ekey = pRange->window.ekey;
    }

786 787 788 789 790 791 792
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

793
    qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
814 815
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
816 817 818 819
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
820

821 822
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
823
}
824

825
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
826 827 828 829
void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
830 831 832 833 834 835 836 837 838 839 840 841 842

  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING) {
    qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

  const char* str = streamGetTaskStatusStr(status);
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
    qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
    return;
  }

843
  while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
844 845
    status = pTask->status.taskStatus;
    if (status == TASK_STATUS__DROPPING) {
846 847 848 849
      qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
      return;
    }

850 851 852 853 854
    if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
      qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
      return;
    }

855 856
    const char* pStatus = streamGetTaskStatusStr(status);
    qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
857 858 859
    taosMsleep(100);
  }

860 861 862 863 864 865 866 867 868 869
  // todo: use the lock of the task.
  taosWLockLatch(&pMeta->lock);

  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    taosWUnLockLatch(&pMeta->lock);
    qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

870 871
  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
872
  taosWUnLockLatch(&pMeta->lock);
873 874 875

  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
876
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
877 878
}

879 880 881 882 883 884 885 886 887 888 889
void streamTaskResume(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__PAUSE) {
    pTask->status.taskStatus = pTask->status.keepTaskStatus;
    pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
    qDebug("s-task:%s resume from pause", pTask->id.idStr);
  } else {
    qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
  }
}

890 891 892
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
893
  const char* id = pTask->id.idStr;
894
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
895
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
896
    taosMsleep(100);
897 898
  }

899
  qDebug("s-task:%s disable task pause", id);
900 901 902 903 904 905
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
}

void streamTaskHalt(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    return;
  }

  if (status == TASK_STATUS__HALT) {
    return;
  }

  // upgrade to halt status
  if (status == TASK_STATUS__PAUSE) {
    qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
           streamGetTaskStatusStr(TASK_STATUS__PAUSE));
  } else {
    qDebug("s-task:%s halt task", pTask->id.idStr);
  }

  pTask->status.keepTaskStatus = status;
  pTask->status.taskStatus = TASK_STATUS__HALT;
}

void streamTaskResumeFromHalt(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  int8_t status = pTask->status.taskStatus;
  if (status != TASK_STATUS__HALT) {
    qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
    return;
  }

  pTask->status.taskStatus = pTask->status.keepTaskStatus;
  pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
  qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}