streamRecover.c 30.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25 26 27 28
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);

static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;
  int64_t el = (taosGetTimestampMs() - pTask->initTs);

  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
29
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
30 31
}

L
liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

52 53 54
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
55
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
56
    case TASK_STATUS__HALT: return "halt";
57
    case TASK_STATUS__PAUSE: return "paused";
58
    case TASK_STATUS__DROPPING: return "dropping";
59 60 61
    default:return "";
  }
}
62

63 64
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
65
  streamSetParamForScanHistory(pTask);
L
liuyao 已提交
66
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
67

L
liuyao 已提交
68 69
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
70
}
71

72 73 74
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
75
      return doLaunchScanHistoryTask(pTask);
76 77 78 79 80 81
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
82
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
83
    streamSetParamForScanHistory(pTask);
84
    streamTaskScanHistoryPrepare(pTask);
85
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
86
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
87
    streamTaskScanHistoryPrepare(pTask);
88 89 90 91
  }
  return 0;
}

92
// check status
93
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
94
  SHistDataRange* pRange = &pTask->dataRange;
95
  STimeWindow*    pWindow = &pRange->window;
96

97
  SStreamTaskCheckReq req = {
98 99
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
100 101
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
102
  };
103

104
  // serialize
105
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
106 107 108 109 110
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

111 112 113 114 115
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

116
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
117
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
118
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
119

120
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
121
    pTask->notReadyTasks = numOfVgs;
122
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
123

124 125 126
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

127
    for (int32_t i = 0; i < numOfVgs; i++) {
128 129 130 131 132
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
133 134
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
135
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
136 137
    }
  } else {
138 139 140 141
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

    streamTaskSetForReady(pTask, 0);
    streamTaskSetRangeStreamCalc(pTask);
142
    streamTaskLaunchScanHistory(pTask);
143 144 145 146 147 148

    // enable pause when init completed.
    if (pTask->historyTaskId.taskId == 0) {
      streamTaskEnablePause(pTask);
    }

149
    launchFillHistoryTask(pTask);
150
  }
151

152 153 154
  return 0;
}

155
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
156 157 158 159 160 161 162 163 164
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
165

166
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
167
         req.downstreamTaskId, req.downstreamNodeId);
168

169
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
170
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
171
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
172
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
173 174 175

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
176 177
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
178
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
179 180 181
      }
    }
  }
182

183 184 185
  return 0;
}

186
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
187 188 189 190 191 192 193 194 195 196 197 198 199 200
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
  streamTaskSetForReady(pTask, numOfReqs);
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
201
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
202 203 204 205 206 207 208
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
209 210
}

211
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
212
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
213
  const char* id = pTask->id.idStr;
214

215
  if (pRsp->status == 1) {
216
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
217
      bool found = false;
218 219 220

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
221 222 223 224 225 226
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
227 228 229 230 231

      if (!found) {
        return -1;
      }

232
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
233
      ASSERT(left >= 0);
234

235 236
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
237
        pTask->checkReqIds = NULL;
238 239

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
240
      } else {
241 242 243
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
244
      }
245
    } else {
246
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
247 248 249 250
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

251
      doProcessDownstreamReadyRsp(pTask, 1);
252
    }
253 254 255
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
256
    taosMsleep(100);
257

258
    streamRecheckDownstream(pTask, pRsp);
259
  }
260

261 262 263
  return 0;
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

290
// common
291
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
292 293
  qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
294
}
295

296
int32_t streamRestoreParam(SStreamTask* pTask) {
297
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
298
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
299
}
300

301
int32_t streamSetStatusNormal(SStreamTask* pTask) {
302 303 304 305 306 307 308 309 310
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
311 312 313
}

// source
L
liuyao 已提交
314 315 316 317 318 319
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
320 321
}

L
liuyao 已提交
322
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
323
  pReq->msgHead.vgId = pTask->info.nodeId;
324 325
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
326
  pReq->igUntreated = igUntreated;
327 328 329
  return 0;
}

330
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
331 332 333
  return streamScanExec(pTask, 100);
}

334
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
335 336 337 338 339 340
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
341

L
Liu Jicong 已提交
342
  // serialize
343
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
344 345
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
346
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
347
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
348
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
349
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
350
    pTask->notReadyTasks = numOfVgs;
351 352 353 354

    qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
355
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
356
      req.downstreamTaskId = pVgInfo->taskId;
357
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
358
    }
359 360 361
  } else {
    qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr);
    streamProcessScanHistoryFinishRsp(pTask);
362
  }
363

364 365 366
  return 0;
}

H
Haojun Liao 已提交
367 368 369 370 371 372
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
373
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
389
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
404
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
405
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
406 407 408 409 410 411 412 413

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
414
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
415
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
416
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
417
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
418 419 420 421 422
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
423
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
424 425 426 427 428 429 430
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

431
// agg
432
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
433
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
434 435 436
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
437 438 439
  return 0;
}

440
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
441
  void* exec = pTask->exec.pExecutor;
442
  if (qRestoreStreamOperatorOption(exec) < 0) {
443 444
    return -1;
  }
445

446 447 448 449 450 451
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

452 453 454 455 456 457 458
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
459

460 461
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
462

463 464 465 466 467 468 469 470
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
471
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
472
    }
473

474
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
475 476 477 478 479

    // sink node does not receive the pause msg from mnode
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
480 481 482
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
483
  }
484

485 486 487
  return 0;
}

488 489 490 491 492 493 494 495 496 497 498 499
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
  taosWUnLockLatch(&pMeta->lock);

H
Haojun Liao 已提交
500 501
  streamTaskEnablePause(pTask);

502 503 504 505 506 507 508
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

509 510 511 512
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

513 514
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
515
           " ver range:%" PRId64 " - %" PRId64,
516 517 518 519 520
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
521 522

  // check if downstream tasks have been ready
523
  streamTaskDoCheckDownstreamTasks(pHTask);
524 525
}

H
Haojun Liao 已提交
526 527 528 529 530
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

531
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
532 533 534
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

535
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
536 537 538 539 540

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
541

H
Haojun Liao 已提交
542
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
543 544 545
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
546
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
547 548 549 550 551 552
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
553

H
Haojun Liao 已提交
554 555 556
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
557

H
Haojun Liao 已提交
558 559
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
560 561 562
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
563 564
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
565
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
566

567
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
568
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
569 570 571
      return;
    }

572 573 574 575
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
576 577 578 579 580

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
581
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
582
  }
H
Haojun Liao 已提交
583 584

  taosMemoryFree(pInfo);
585 586 587 588
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
589
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
590
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
591
  int32_t      hTaskId = pTask->historyTaskId.taskId;
592 593

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
594
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
595 596
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
597
          pMeta->vgId, hTaskId);
598

H
Haojun Liao 已提交
599 600 601
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
602

603 604 605
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
606
        // todo failed to create timer
H
Haojun Liao 已提交
607
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
608 609
      } else {
        pTask->status.timerActive = 1;  // timer is active
610
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
611
      }
H
Haojun Liao 已提交
612 613
    } else {  // timer exists
      pTask->status.timerActive = 1;
614 615
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
616 617 618 619 620 621 622 623 624 625
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

626 627 628 629 630 631 632 633 634 635 636 637
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
638
  code = streamDispatchScanHistoryFinishMsg(pTask);
639 640 641 642 643 644 645
  if (code < 0) {
    return -1;
  }

  return 0;
}

L
liuyao 已提交
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
  int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
        "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
        pTask->id.idStr);
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
  }
}


682
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
683
  if (tStartEncode(pEncoder) < 0) return -1;
684
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
685
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
686
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
687 688 689 690
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
691 692 693 694
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

695
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
696
  if (tStartDecode(pDecoder) < 0) return -1;
697
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
698
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
699
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
700 701 702 703
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
704 705 706 707
  tEndDecode(pDecoder);
  return 0;
}

708
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
709
  if (tStartEncode(pEncoder) < 0) return -1;
710
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
711
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
712 713 714 715 716 717
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
718 719 720 721
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

722
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
723
  if (tStartDecode(pDecoder) < 0) return -1;
724 725 726 727 728 729 730 731
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
732 733 734 735
  tEndDecode(pDecoder);
  return 0;
}

736
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
737 738
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
739 740 741
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
742
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
743 744 745
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
746

747
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
748 749
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
750 751 752
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
753
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
754 755 756
  tEndDecode(pDecoder);
  return 0;
}
757

758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
    qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
           " - %" PRId64,
           pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

    int64_t ekey = pRange->window.ekey + 1;
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

    qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
796 797
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
798 799 800 801
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
802

803 804
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
805
}
806 807 808 809 810 811 812 813 814 815 816 817 818 819 820

void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
  while(!pTask->status.pauseAllowed) {
    qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
    taosMsleep(100);
  }

  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);

  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
821
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
822 823 824 825 826
}

// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
827
  const char* id = pTask->id.idStr;
828
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
829 830
    taosMsleep(100);
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, check in 100ms", id);
831 832
  }

833
  qDebug("s-task:%s disable task pause", id);
834 835 836 837 838 839 840
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
}