streamRecover.c 26.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

L
liuyao 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

40 41 42
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
43
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
44
    case TASK_STATUS__HALT: return "halt";
45
    case TASK_STATUS__PAUSE: return "paused";
46
    case TASK_STATUS__DROPPING: return "dropping";
47 48 49
    default:return "";
  }
}
50

51 52
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
H
Haojun Liao 已提交
53

54 55
  qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
         pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
56

57
  streamSetParamForScanHistory(pTask);
L
liuyao 已提交
58
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
59

L
liuyao 已提交
60 61
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
62
}
63

64 65 66 67 68 69 70 71 72 73
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
      return doLaunchScanHistoryTask(pTask);
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
74
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
75
    streamSetStatusNormal(pTask);
76 77
    streamSetParamForScanHistory(pTask);
    streamAggScanHistoryPrepare(pTask);
78
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
79
    streamSetStatusNormal(pTask);
80
    qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
81
  }
82

83 84 85
  return 0;
}

86
// check status
87
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
88
  SHistDataRange* pRange = &pTask->dataRange;
89
  STimeWindow*    pWindow = &pRange->window;
90

91
  SStreamTaskCheckReq req = {
92 93
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
94 95
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
96
  };
97

98 99 100 101 102 103 104
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

105 106 107 108 109
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

110
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
111 112
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
113

114
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
115
    pTask->notReadyTasks = numOfVgs;
116
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
117

118 119 120
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

121
    for (int32_t i = 0; i < numOfVgs; i++) {
122 123 124 125 126
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
127 128
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
129
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
130 131
    }
  } else {
132
    pTask->status.downstreamReady = 1;
133
    qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
134
           pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
135

136
    streamTaskLaunchScanHistory(pTask);
137
  }
138

139 140 141
  return 0;
}

142
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
143 144 145 146 147 148 149 150 151
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
152

153
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
154
         req.downstreamTaskId, req.downstreamNodeId);
155

156
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
157
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
158 159
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
160 161 162

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
163 164
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
165
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
166 167 168
      }
    }
  }
169

170 171 172
  return 0;
}

173 174
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
175 176
}

177
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
178
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
179
  const char* id = pTask->id.idStr;
180

181 182 183
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
184 185 186

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
187 188 189 190 191 192
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
193 194 195 196 197

      if (!found) {
        return -1;
      }

198
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
199
      ASSERT(left >= 0);
200

201 202
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
203
        pTask->checkReqIds = NULL;
204
        pTask->status.downstreamReady = 1;
205

206
        if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
207 208
          qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
                 numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
209
          streamTaskLaunchScanHistory(pTask);
H
Haojun Liao 已提交
210
        } else {
211 212 213 214
          ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
          qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
        }
215
      } else {
216 217 218
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
219
      }
220 221
    } else {
      ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
222 223 224 225
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

226
      // set the downstream tasks have been checked flag
227 228
      ASSERT(pTask->status.downstreamReady == 0);
      pTask->status.downstreamReady = 1;
229

230
      ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
231 232 233 234 235
      if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
        qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
               streamGetTaskStatusStr(pTask->status.taskStatus));
        streamTaskLaunchScanHistory(pTask);
      } else {
236
        qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
237 238
               streamGetTaskStatusStr(pTask->status.taskStatus));
      }
239
    }
240 241 242
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
243
    taosMsleep(100);
244

245
    streamRecheckDownstream(pTask, pRsp);
246
  }
247

248 249 250
  return 0;
}

251
// common
252
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
253 254
  qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
255
}
256

257
int32_t streamRestoreParam(SStreamTask* pTask) {
258 259
  qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
260
}
261

262
int32_t streamSetStatusNormal(SStreamTask* pTask) {
263 264 265 266 267 268 269 270 271
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
272 273 274
}

// source
L
liuyao 已提交
275 276 277 278 279 280
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
281 282
}

L
liuyao 已提交
283
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
284
  pReq->msgHead.vgId = pTask->info.nodeId;
285 286
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
287
  pReq->igUntreated = igUntreated;
288 289 290
  return 0;
}

291
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
292 293 294
  return streamScanExec(pTask, 100);
}

295
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
296
  SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
H
Haojun Liao 已提交
297

L
Liu Jicong 已提交
298
  // serialize
299
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
300
    req.taskId = pTask->fixedEpDispatcher.taskId;
301
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
302
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
303
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
304 305 306 307 308
    int32_t numOfVgs = taosArrayGetSize(vgInfo);

    qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
309 310
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
311
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
312
    }
313
  }
314

315 316 317
  return 0;
}

H
Haojun Liao 已提交
318 319 320 321 322 323
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
324
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
340
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
355 356
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId);
H
Haojun Liao 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.taskId = pTask->fixedEpDispatcher.taskId;
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

382
// agg
383
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
384
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
385
  qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
386
         pTask->numOfWaitingUpstream);
387 388 389
  return 0;
}

390
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
391
  void* exec = pTask->exec.pExecutor;
392
  if (qRestoreStreamOperatorOption(exec) < 0) {
393 394
    return -1;
  }
395

396 397 398 399 400 401
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

402
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
403 404
  if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
    int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
L
Liu Jicong 已提交
405
    ASSERT(left >= 0);
406

L
Liu Jicong 已提交
407
    if (left == 0) {
408
      int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
409 410 411
      qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
             pTask->id.idStr, numOfTasks);

412
      streamAggUpstreamScanHistoryFinish(pTask);
413
    } else {
414 415
      qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
             pTask->id.idStr, taskId, childId, left);
L
Liu Jicong 已提交
416
    }
417

418
  }
419

420 421 422
  return 0;
}

423 424 425 426
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

427 428
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
429
           " ver range:%" PRId64 " - %" PRId64,
430 431 432 433 434
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
435 436 437 438 439

  // check if downstream tasks have been ready
  streamTaskCheckDownstreamTasks(pHTask);
}

H
Haojun Liao 已提交
440 441 442 443 444
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

445
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
446 447 448
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

449
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
450 451 452 453 454

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
455

H
Haojun Liao 已提交
456
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
457 458 459
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
460
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
461 462 463 464 465 466
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
467

H
Haojun Liao 已提交
468 469 470
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
471

H
Haojun Liao 已提交
472 473
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
474 475 476
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
477 478
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
479
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
480

481
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
482
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
483 484 485
      return;
    }

486 487 488 489
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
490 491 492 493 494

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
495
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
496
  }
H
Haojun Liao 已提交
497 498

  taosMemoryFree(pInfo);
499 500 501 502
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
503
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
504
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
505
  int32_t      hTaskId = pTask->historyTaskId.taskId;
506 507

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
508
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
509 510
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
511
          pMeta->vgId, hTaskId);
512

H
Haojun Liao 已提交
513 514 515
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
516

517 518 519
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
520
        // todo failed to create timer
H
Haojun Liao 已提交
521
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
522 523
      } else {
        pTask->status.timerActive = 1;  // timer is active
524
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
525
      }
H
Haojun Liao 已提交
526 527
    } else {  // timer exists
      pTask->status.timerActive = 1;
528 529
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
530 531 532 533 534 535 536 537 538 539
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

540 541 542 543 544 545 546 547 548 549 550 551 552
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
553
  code = streamDispatchScanHistoryFinishMsg(pTask);
554 555 556 557
  if (code < 0) {
    return -1;
  }

558
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
559

560 561
  // ready to process data from inputQ
  streamSetStatusNormal(pTask);
562
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
563

564
  taosWLockLatch(&pMeta->lock);
565
  streamMetaSaveTask(pMeta, pTask);
566 567
  taosWUnLockLatch(&pMeta->lock);

568 569 570
  return 0;
}

L
liuyao 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
  int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
        "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
        pTask->id.idStr);
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
  }
}


607
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
608
  if (tStartEncode(pEncoder) < 0) return -1;
609
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
610
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
611
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
612 613 614 615
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
616 617 618 619
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

620
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
621
  if (tStartDecode(pDecoder) < 0) return -1;
622
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
623
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
624
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
625 626 627 628
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
629 630 631 632
  tEndDecode(pDecoder);
  return 0;
}

633
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
634
  if (tStartEncode(pEncoder) < 0) return -1;
635
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
636
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
637 638 639 640 641 642
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
643 644 645 646
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

647
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
648
  if (tStartDecode(pDecoder) < 0) return -1;
649 650 651 652 653 654 655 656
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
657 658 659 660
  tEndDecode(pDecoder);
  return 0;
}

661
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
662 663 664
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
665
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
666 667 668
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
669
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
670 671 672
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
673
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
674 675 676
  tEndDecode(pDecoder);
  return 0;
}
677

H
Haojun Liao 已提交
678
// todo handle race condition, this task may be destroyed
679 680 681 682 683 684 685
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
  } else {
    // calculate the correct start time window, and start the handle the history data for the main task.
    if (pTask->historyTaskId.taskId != 0) {
      // check downstream tasks for associated scan-history-data tasks
686
      streamCheckHistoryTaskDownstream(pTask);
687 688 689

      // launch current task
      SHistDataRange* pRange = &pTask->dataRange;
690 691
      int64_t         ekey = pRange->window.ekey + 1;
      int64_t         ver = pRange->range.minVer;
692 693 694 695 696 697 698

      pRange->window.skey = ekey;
      pRange->window.ekey = INT64_MAX;
      pRange->range.minVer = 0;
      pRange->range.maxVer = ver;

      qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
699
             ", ver range:%" PRId64 " - %" PRId64,
700 701 702 703
             pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
             pRange->range.maxVer);
    } else {
      SHistDataRange* pRange = &pTask->dataRange;
704 705
      qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
             ", ver range:%" PRId64 " - %" PRId64,
706 707 708
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }

709
    ASSERT(pTask->status.downstreamReady == 0);
710 711 712 713 714

    // check downstream tasks for itself
    streamTaskCheckDownstreamTasks(pTask);
  }
}