streamRecover.c 33.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

20 21 22 23 24 25
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);

static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
  ASSERT(pTask->status.downstreamReady == 0);
  pTask->status.downstreamReady = 1;
H
Haojun Liao 已提交
26
  int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
27 28

  qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
H
Haojun Liao 已提交
29
         pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
30 31
}

L
liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

52 53 54
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
55
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
56
    case TASK_STATUS__HALT: return "halt";
57
    case TASK_STATUS__PAUSE: return "paused";
58
    case TASK_STATUS__DROPPING: return "dropping";
59 60 61
    default:return "";
  }
}
62

63 64
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
65
  streamSetParamForScanHistory(pTask);
L
liuyao 已提交
66
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
67

L
liuyao 已提交
68 69
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
70
}
71

72 73 74
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
75
      return doLaunchScanHistoryTask(pTask);
76 77 78 79 80 81
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
82
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
83
    streamSetParamForScanHistory(pTask);
84
    streamTaskScanHistoryPrepare(pTask);
85
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
86
    qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
87
    streamTaskScanHistoryPrepare(pTask);
88 89 90 91
  }
  return 0;
}

92
// check status
93
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
94
  SHistDataRange* pRange = &pTask->dataRange;
95
  STimeWindow*    pWindow = &pRange->window;
96

97
  SStreamTaskCheckReq req = {
98 99
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
100 101
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
102
  };
103

104
  // serialize
105
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
106 107 108 109 110
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

111 112 113 114 115
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

116
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
117
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
118
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
119

120
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
121
    pTask->notReadyTasks = numOfVgs;
122
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
123

124 125 126
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

127
    for (int32_t i = 0; i < numOfVgs; i++) {
128 129 130 131 132
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
133 134
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
135
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
136 137
    }
  } else {
138 139 140 141
    qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);

    streamTaskSetForReady(pTask, 0);
    streamTaskSetRangeStreamCalc(pTask);
142
    streamTaskLaunchScanHistory(pTask);
143

144
    launchFillHistoryTask(pTask);
145
  }
146

147 148 149
  return 0;
}

150
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
151 152 153 154 155 156 157 158 159
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
160

161
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
162
         req.downstreamTaskId, req.downstreamNodeId);
163

164
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
165
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
166
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
167
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
168 169 170

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
171 172
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
173
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
174 175 176
      }
    }
  }
177

178 179 180
  return 0;
}

181
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
182 183 184 185 186 187 188 189 190 191 192 193 194 195
  return (pTask->status.downstreamReady == 1)? 1:0;
}

static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
  streamTaskSetForReady(pTask, numOfReqs);
  const char* id = pTask->id.idStr;

  int8_t      status = pTask->status.taskStatus;
  const char* str = streamGetTaskStatusStr(status);

  ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
  streamTaskSetRangeStreamCalc(pTask);

  if (status == TASK_STATUS__SCAN_HISTORY) {
196
    qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
197 198 199 200 201 202 203
    streamTaskLaunchScanHistory(pTask);
  } else {
    qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
  }

  // when current stream task is ready, check the related fill history task.
  launchFillHistoryTask(pTask);
204 205
}

206
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
207
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
208
  const char* id = pTask->id.idStr;
209

210
  if (pRsp->status == 1) {
211
    if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
212
      bool found = false;
213 214 215

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
216 217 218 219 220 221
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
222 223 224 225 226

      if (!found) {
        return -1;
      }

227
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
228
      ASSERT(left >= 0);
229

230 231
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
232
        pTask->checkReqIds = NULL;
233 234

        doProcessDownstreamReadyRsp(pTask, numOfReqs);
235
      } else {
236 237 238
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
239
      }
240
    } else {
241
      ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
242 243 244 245
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

246
      doProcessDownstreamReadyRsp(pTask, 1);
247
    }
248 249 250
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
251
    taosMsleep(100);
252

253
    streamRecheckDownstream(pTask, pRsp);
254
  }
255

256 257 258
  return 0;
}

259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
                           SRpcHandleInfo *pRpcInfo, int32_t taskId) {
  SEncoder encoder;
  int32_t  code;
  int32_t  len;

  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
  if (code < 0) {
    qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
    return -1;
  }

  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeStreamTaskCheckRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};

  tmsgSendRsp(&rspMsg);
  return 0;
}

285
// common
286
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
287
  qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
288
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
289
}
290

291
int32_t streamRestoreParam(SStreamTask* pTask) {
292
  qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
293
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
294
}
295

296
int32_t streamSetStatusNormal(SStreamTask* pTask) {
297 298 299 300 301 302 303 304 305
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
306 307 308
}

// source
L
liuyao 已提交
309 310 311 312 313 314
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
315 316
}

L
liuyao 已提交
317
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
318
  pReq->msgHead.vgId = pTask->info.nodeId;
319 320
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
321
  pReq->igUntreated = igUntreated;
322 323 324
  return 0;
}

325
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
326 327 328
  return streamScanExec(pTask, 100);
}

329
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
330 331 332 333 334 335
  SStreamScanHistoryFinishReq req = {
      .streamId = pTask->id.streamId,
      .childId = pTask->info.selfChildId,
      .upstreamTaskId = pTask->id.taskId,
      .upstreamNodeId = pTask->pMeta->vgId,
  };
H
Haojun Liao 已提交
336

L
Liu Jicong 已提交
337
  // serialize
338
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
339 340
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->notReadyTasks = 1;
341
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
342
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
343
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
344
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
345
    pTask->notReadyTasks = numOfVgs;
346

347
    qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
348 349
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
350
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
351
      req.downstreamTaskId = pVgInfo->taskId;
352
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
353
    }
354
  } else {
355
    qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
356
    streamProcessScanHistoryFinishRsp(pTask);
357
  }
358

359 360 361
  return 0;
}

H
Haojun Liao 已提交
362 363 364 365 366 367
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
368
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
384
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
399
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
400
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
H
Haojun Liao 已提交
401 402 403 404 405 406 407 408

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
409
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
410
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
411
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
412
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
H
Haojun Liao 已提交
413 414 415 416 417
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
418
      req.downstreamTaskId = pVgInfo->taskId;
H
Haojun Liao 已提交
419 420 421 422 423 424 425
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

426
// agg
427
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
428
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
429 430 431
  qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
         pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
         streamGetTaskStatusStr(pTask->status.taskStatus));
432 433 434
  return 0;
}

435
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
436
  void* exec = pTask->exec.pExecutor;
437
  if (qRestoreStreamOperatorOption(exec) < 0) {
438 439
    return -1;
  }
440

441 442 443 444 445 446
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

447 448 449 450 451 452 453
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
                                          SRpcHandleInfo* pRpcInfo) {
  int32_t taskLevel = pTask->info.taskLevel;
  ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);

  // sink node do not send end of scan history msg to its upstream, which is agg task.
  streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
454

455 456
  int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
  ASSERT(left >= 0);
457

458 459 460 461 462 463 464 465
  if (left == 0) {
    int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
    qDebug(
        "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
        "rsp to all upstream tasks",
        pTask->id.idStr, numOfTasks);

    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
466
      streamAggUpstreamScanHistoryFinish(pTask);
L
Liu Jicong 已提交
467
    }
468

469
    streamNotifyUpstreamContinue(pTask);
H
Haojun Liao 已提交
470

471
    // sink node does not receive the pause msg from mnode, so does not need enable it
H
Haojun Liao 已提交
472 473 474
    if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
      streamTaskEnablePause(pTask);
    }
475 476 477
  } else {
    qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
           pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
478
  }
479

480 481 482
  return 0;
}

483 484 485 486 487 488 489 490 491 492 493 494
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
  SStreamMeta* pMeta = pTask->pMeta;

  // execute in the scan history complete call back msg, ready to process data from inputQ
  streamSetStatusNormal(pTask);
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pTask);
  taosWUnLockLatch(&pMeta->lock);

495
  // history data scan in the stream time window finished, now let's enable the pause
H
Haojun Liao 已提交
496 497
  streamTaskEnablePause(pTask);

498 499 500 501 502 503 504
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    streamSchedExec(pTask);
  }

  return TSDB_CODE_SUCCESS;
}

505 506 507 508
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

509
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
510
    qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
511
           " ver range:%" PRId64 " - %" PRId64,
512 513 514 515 516
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
517 518

  // check if downstream tasks have been ready
519
  streamTaskDoCheckDownstreamTasks(pHTask);
520 521
}

H
Haojun Liao 已提交
522 523 524 525 526
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

527
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
528 529 530
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

531
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
532 533 534 535 536

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
537

H
Haojun Liao 已提交
538
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
539 540 541
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
542
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
543 544 545 546 547 548
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
549

H
Haojun Liao 已提交
550 551 552
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
553

H
Haojun Liao 已提交
554 555
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
556 557 558
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
559 560
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
561
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
562

563
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
564
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
565 566 567
      return;
    }

568 569 570 571
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
572 573 574 575 576

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
577
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
578
  }
H
Haojun Liao 已提交
579 580

  taosMemoryFree(pInfo);
581 582 583 584
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
585
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
586
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
587
  int32_t      hTaskId = pTask->historyTaskId.taskId;
588 589

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
590
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
591 592
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
593
          pMeta->vgId, hTaskId);
594

H
Haojun Liao 已提交
595 596 597
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
598

599 600 601
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
602
        // todo failed to create timer
H
Haojun Liao 已提交
603
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
604 605
      } else {
        pTask->status.timerActive = 1;  // timer is active
606
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
607
      }
H
Haojun Liao 已提交
608 609
    } else {  // timer exists
      pTask->status.timerActive = 1;
610 611
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
612 613 614 615 616 617 618 619 620 621
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

622 623 624 625 626 627 628 629 630 631 632 633
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
634
  code = streamDispatchScanHistoryFinishMsg(pTask);
635 636 637 638 639 640 641
  if (code < 0) {
    return -1;
  }

  return 0;
}

L
liuyao 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

657
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
658 659 660 661 662 663 664 665
  SVersionRange* pRange = &pTask->dataRange.range;
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
666 667 668 669
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
        "related stream task currentVer:%" PRId64,
        pTask->id.idStr, latestVer);
    return true;
670 671 672 673 674
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
675
    return false;
676 677 678 679
  }
}


680
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
681
  if (tStartEncode(pEncoder) < 0) return -1;
682
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
683
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
684
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
685 686 687 688
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
689 690 691 692
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

693
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
694
  if (tStartDecode(pDecoder) < 0) return -1;
695
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
696
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
697
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
698 699 700 701
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
702 703 704 705
  tEndDecode(pDecoder);
  return 0;
}

706
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
707
  if (tStartEncode(pEncoder) < 0) return -1;
708
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
709
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
710 711 712 713 714 715
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
716 717 718 719
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

720
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
721
  if (tStartDecode(pDecoder) < 0) return -1;
722 723 724 725 726 727 728 729
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
730 731 732 733
  tEndDecode(pDecoder);
  return 0;
}

734
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
735 736
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
737 738 739
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
740
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
741 742 743
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
744

745
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
746 747
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
748 749 750
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
751
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
752 753 754
  tEndDecode(pDecoder);
  return 0;
}
755

756 757 758
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
  if (pTask->historyTaskId.taskId == 0) {
    SHistDataRange* pRange = &pTask->dataRange;
759 760 761 762 763 764 765 766 767
    if (pTask->info.fillHistory == 1) {
      qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
                 "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
             "-%" PRId64,
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }
768 769 770
  } else {
    SHistDataRange* pRange = &pTask->dataRange;

771 772 773 774 775 776 777
    int64_t ekey = 0;
    if (pRange->window.ekey < INT64_MAX) {
      ekey = pRange->window.ekey + 1;
    } else {
      ekey = pRange->window.ekey;
    }

778 779 780 781 782 783 784
    int64_t ver = pRange->range.minVer;

    pRange->window.skey = ekey;
    pRange->window.ekey = INT64_MAX;
    pRange->range.minVer = 0;
    pRange->range.maxVer = ver;

785
    qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
           ", verRang:%" PRId64 " - %" PRId64,
           pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
           pRange->range.maxVer);
  }
}

void launchFillHistoryTask(SStreamTask* pTask) {
  int32_t tId = pTask->historyTaskId.taskId;
  if (tId == 0) {
    return;
  }

  ASSERT(pTask->status.downstreamReady == 1);
  qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);

  // launch associated fill history task
  streamLaunchFillHistoryTask(pTask);
}

void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
806 807
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
808 809 810 811
    return;
  }

  ASSERT(pTask->status.downstreamReady == 0);
812

813 814
  // check downstream tasks for itself
  streamTaskDoCheckDownstreamTasks(pTask);
815
}
816

817
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
818 819 820 821
void streamTaskPause(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;

  int64_t st = taosGetTimestampMs();
822 823 824 825 826 827 828 829 830 831 832 833 834 835

  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING) {
    qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
    return;
  }

  const char* str = streamGetTaskStatusStr(status);
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
    qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
    return;
  }

  while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
836 837
    status = pTask->status.taskStatus;
    if (status == TASK_STATUS__DROPPING) {
838 839 840 841
      qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
      return;
    }

842 843 844 845 846
    if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
      qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
      return;
    }

847 848 849 850 851 852 853 854 855
    qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
    taosMsleep(100);
  }

  atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
  atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);

  int64_t el = taosGetTimestampMs() - st;
  qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
H
Haojun Liao 已提交
856
         streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
857 858
}

859 860 861 862 863 864 865 866 867 868 869
void streamTaskResume(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__PAUSE) {
    pTask->status.taskStatus = pTask->status.keepTaskStatus;
    pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
    qDebug("s-task:%s resume from pause", pTask->id.idStr);
  } else {
    qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
  }
}

870 871 872
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
  // pre-condition check
873
  const char* id = pTask->id.idStr;
874
  while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
875
    qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
876
    taosMsleep(100);
877 878
  }

879
  qDebug("s-task:%s disable task pause", id);
880 881 882 883 884 885
  pTask->status.pauseAllowed = 0;
}

void streamTaskEnablePause(SStreamTask* pTask) {
  qDebug("s-task:%s enable task pause", pTask->id.idStr);
  pTask->status.pauseAllowed = 1;
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921
}

void streamTaskHalt(SStreamTask* pTask) {
  int8_t status = pTask->status.taskStatus;
  if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
    return;
  }

  if (status == TASK_STATUS__HALT) {
    return;
  }

  // upgrade to halt status
  if (status == TASK_STATUS__PAUSE) {
    qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
           streamGetTaskStatusStr(TASK_STATUS__PAUSE));
  } else {
    qDebug("s-task:%s halt task", pTask->id.idStr);
  }

  pTask->status.keepTaskStatus = status;
  pTask->status.taskStatus = TASK_STATUS__HALT;
}

void streamTaskResumeFromHalt(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  int8_t status = pTask->status.taskStatus;
  if (status != TASK_STATUS__HALT) {
    qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
    return;
  }

  pTask->status.taskStatus = pTask->status.keepTaskStatus;
  pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
  qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}