streamRecover.c 26.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
18 19
#include "wal.h"

L
liuyao 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
  SStreamScanHistoryReq req;
  streamBuildSourceRecover1Req(pTask, &req, igUntreated);
  int32_t len = sizeof(SStreamScanHistoryReq);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);

  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
  if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
    /*ASSERT(0);*/
  }

  return 0;
}

40 41 42
const char* streamGetTaskStatusStr(int32_t status) {
  switch(status) {
    case TASK_STATUS__NORMAL: return "normal";
43
    case TASK_STATUS__SCAN_HISTORY: return "scan-history";
44
    case TASK_STATUS__HALT: return "halt";
45
    case TASK_STATUS__PAUSE: return "paused";
46
    case TASK_STATUS__DROPPING: return "dropping";
47 48 49
    default:return "";
  }
}
50

51 52
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
H
Haojun Liao 已提交
53

54 55
  qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
         pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
56

57
  streamSetParamForScanHistory(pTask);
L
liuyao 已提交
58
  streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
59

L
liuyao 已提交
60 61
  int32_t code = streamStartRecoverTask(pTask, 0);
  return code;
62
}
63

64 65 66 67 68 69 70 71 72 73
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
      return doLaunchScanHistoryTask(pTask);
    } else {
      ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
      qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
             walReaderGetCurrentVer(pTask->exec.pWalReader));
    }
74
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
75
    streamSetStatusNormal(pTask);
76 77
    streamSetParamForScanHistory(pTask);
    streamAggScanHistoryPrepare(pTask);
78
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
79
    streamSetStatusNormal(pTask);
80
    qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
81
  }
82

83 84 85
  return 0;
}

86
// check status
87
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
88
  SHistDataRange* pRange = &pTask->dataRange;
89
  STimeWindow*    pWindow = &pRange->window;
90

91
  SStreamTaskCheckReq req = {
92 93
      .streamId = pTask->id.streamId,
      .upstreamTaskId = pTask->id.taskId,
94 95
      .upstreamNodeId = pTask->info.nodeId,
      .childId = pTask->info.selfChildId,
96
  };
97

98 99 100 101 102 103 104
  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.reqId = tGenIdPI64();
    req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
    req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    pTask->checkReqId = req.reqId;

105 106 107 108 109
    qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
           "-%" PRId64 ", req:0x%" PRIx64,
           pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
           pWindow->skey, pWindow->ekey, req.reqId);

110
    streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
111 112
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
113

114
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
115
    pTask->notReadyTasks = numOfVgs;
116
    pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
117

118 119 120
    qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
           pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);

121
    for (int32_t i = 0; i < numOfVgs; i++) {
122 123 124 125 126
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.reqId = tGenIdPI64();
      taosArrayPush(pTask->checkReqIds, &req.reqId);
      req.downstreamNodeId = pVgInfo->vgId;
      req.downstreamTaskId = pVgInfo->taskId;
127 128
      qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
             req.downstreamTaskId, req.downstreamNodeId, i);
129
      streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
130 131
    }
  } else {
132
    pTask->status.downstreamReady = 1;
133
    qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
134
           pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
135

136
    streamTaskLaunchScanHistory(pTask);
137
  }
138

139 140 141
  return 0;
}

142
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
143 144 145 146 147 148 149 150 151
  SStreamTaskCheckReq req = {
      .reqId = pRsp->reqId,
      .streamId = pRsp->streamId,
      .upstreamTaskId = pRsp->upstreamTaskId,
      .upstreamNodeId = pRsp->upstreamNodeId,
      .downstreamTaskId = pRsp->downstreamTaskId,
      .downstreamNodeId = pRsp->downstreamNodeId,
      .childId = pRsp->childId,
  };
152

153
  qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
154
         req.downstreamTaskId, req.downstreamNodeId);
155

156
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
157
    streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
158 159
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
160 161 162

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
163 164
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (pVgInfo->taskId == req.downstreamTaskId) {
165
        streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
166 167 168
      }
    }
  }
169

170 171 172
  return 0;
}

173 174
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
  return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
175 176
}

177
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
178
  ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
179
  const char* id = pTask->id.idStr;
180

181 182 183
  if (pRsp->status == 1) {
    if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      bool found = false;
184 185 186

      int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
      for (int32_t i = 0; i < numOfReqs; i++) {
187 188 189 190 191 192
        int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
        if (reqId == pRsp->reqId) {
          found = true;
          break;
        }
      }
193 194 195 196 197

      if (!found) {
        return -1;
      }

198
      int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
199
      ASSERT(left >= 0);
200

201 202
      if (left == 0) {
        taosArrayDestroy(pTask->checkReqIds);
L
Liu Jicong 已提交
203
        pTask->checkReqIds = NULL;
204

205
        if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
206 207
          qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
                 numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
208
          streamTaskLaunchScanHistory(pTask);
H
Haojun Liao 已提交
209
        } else {
210 211 212 213
          ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
          qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
                 streamGetTaskStatusStr(pTask->status.taskStatus));
        }
214
      } else {
215 216 217
        int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
        qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
               pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
218
      }
219 220
    } else {
      ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
221 222 223 224
      if (pRsp->reqId != pTask->checkReqId) {
        return -1;
      }

225
      // set the downstream tasks have been checked flag
226 227
      ASSERT(pTask->status.downstreamReady == 0);
      pTask->status.downstreamReady = 1;
228

229
      ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
230 231 232 233 234
      if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
        qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
               streamGetTaskStatusStr(pTask->status.taskStatus));
        streamTaskLaunchScanHistory(pTask);
      } else {
235
        qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
236 237
               streamGetTaskStatusStr(pTask->status.taskStatus));
      }
238
    }
239 240 241
  } else {  // not ready, wait for 100ms and retry
    qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
           pRsp->downstreamNodeId);
242
    taosMsleep(100);
243

244
    streamRecheckDownstream(pTask, pRsp);
245
  }
246

247 248 249
  return 0;
}

250
// common
251
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
252 253
  qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
254
}
255

256
int32_t streamRestoreParam(SStreamTask* pTask) {
257 258
  qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
  return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
259
}
260

261
int32_t streamSetStatusNormal(SStreamTask* pTask) {
262 263 264 265 266 267 268 269 270
  int32_t status = atomic_load_8(&pTask->status.taskStatus);
  if (status == TASK_STATUS__DROPPING) {
    qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
    return -1;
  } else {
    qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
    return 0;
  }
271 272 273
}

// source
L
liuyao 已提交
274 275 276 277 278 279
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}

int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
280 281
}

L
liuyao 已提交
282
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
283
  pReq->msgHead.vgId = pTask->info.nodeId;
284 285
  pReq->streamId = pTask->id.streamId;
  pReq->taskId = pTask->id.taskId;
L
liuyao 已提交
286
  pReq->igUntreated = igUntreated;
287 288 289
  return 0;
}

290
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
291 292 293
  return streamScanExec(pTask, 100);
}

294
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
295
  SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
H
Haojun Liao 已提交
296

L
Liu Jicong 已提交
297
  // serialize
298
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
299
    req.taskId = pTask->fixedEpDispatcher.taskId;
300
    streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
301
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
302
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
303 304 305 306 307
    int32_t numOfVgs = taosArrayGetSize(vgInfo);

    qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
           numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
    for (int32_t i = 0; i < numOfVgs; i++) {
L
Liu Jicong 已提交
308 309
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
310
      streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
L
Liu Jicong 已提交
311
    }
312
  }
313

314 315 316
  return 0;
}

H
Haojun Liao 已提交
317 318 319 320 321 322
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
323
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
H
Haojun Liao 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
339
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
  }

  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TRANSFER_STATE;
  msg.info.noResp = 1;

  tmsgSendReq(pEpSet, &msg);
H
Haojun Liao 已提交
354 355
  qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
         pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId);
H
Haojun Liao 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380

  return 0;
}

int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
  SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };

  // serialize
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
    req.taskId = pTask->fixedEpDispatcher.taskId;
    doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

    int32_t numOfVgs = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < numOfVgs; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      req.taskId = pVgInfo->taskId;
      doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
    }
  }

  return 0;
}

381
// agg
382
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
383
  pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
384
  qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
385
         pTask->numOfWaitingUpstream);
386 387 388
  return 0;
}

389
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
390
  void* exec = pTask->exec.pExecutor;
391
  if (qRestoreStreamOperatorOption(exec) < 0) {
392 393
    return -1;
  }
394

395 396 397 398 399 400
  if (qStreamRecoverFinish(exec) < 0) {
    return -1;
  }
  return 0;
}

401
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
402 403
  if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
    int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
L
Liu Jicong 已提交
404
    ASSERT(left >= 0);
405

L
Liu Jicong 已提交
406
    if (left == 0) {
407
      int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
408 409 410
      qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
             pTask->id.idStr, numOfTasks);

411
      streamAggUpstreamScanHistoryFinish(pTask);
412
    } else {
413 414
      qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
             pTask->id.idStr, taskId, childId, left);
L
Liu Jicong 已提交
415
    }
416

417
  }
418

419 420 421
  return 0;
}

422 423 424 425
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
  pHTask->dataRange.range.minVer = 0;
  pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;

426 427
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
428
           " ver range:%" PRId64 " - %" PRId64,
429 430 431 432 433
           pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
           pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
  } else {
    qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
  }
434 435 436 437 438

  // check if downstream tasks have been ready
  streamTaskCheckDownstreamTasks(pHTask);
}

H
Haojun Liao 已提交
439 440 441 442 443
typedef struct SStreamTaskRetryInfo {
  SStreamMeta* pMeta;
  int32_t taskId;
} SStreamTaskRetryInfo;

444
static void tryLaunchHistoryTask(void* param, void* tmrId) {
H
Haojun Liao 已提交
445 446 447
  SStreamTaskRetryInfo* pInfo = param;
  SStreamMeta*          pMeta = pInfo->pMeta;

448
  qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
H
Haojun Liao 已提交
449 450 451 452 453

  taosWLockLatch(&pMeta->lock);
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->status.timerActive == 1);
H
Haojun Liao 已提交
454

H
Haojun Liao 已提交
455
    if (streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
456 457 458
      const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
      qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);

H
Haojun Liao 已提交
459
      taosMemoryFree(pInfo);
H
Haojun Liao 已提交
460 461 462 463 464 465
      (*ppTask)->status.timerActive = 0;
      taosWUnLockLatch(&pMeta->lock);
      return;
    }
  }
  taosWUnLockLatch(&pMeta->lock);
466

H
Haojun Liao 已提交
467 468 469
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
  if (pTask != NULL) {
    ASSERT(pTask->status.timerActive == 1);
470

H
Haojun Liao 已提交
471 472
    // abort the timer if intend to stop task
    SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
473 474 475
    if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
      const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
      qWarn(
476 477
          "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
          "destroyed, or should stop",
478
          pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
479

480
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
481
      streamMetaReleaseTask(pMeta, pTask);
H
Haojun Liao 已提交
482 483 484
      return;
    }

485 486 487 488
    if (pHTask != NULL) {
      doCheckDownstreamStatus(pTask, pHTask);
      streamMetaReleaseTask(pMeta, pHTask);
    }
H
Haojun Liao 已提交
489 490 491 492 493

    // not in timer anymore
    pTask->status.timerActive = 0;
    streamMetaReleaseTask(pMeta, pTask);
  } else {
494
    qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
H
Haojun Liao 已提交
495
  }
H
Haojun Liao 已提交
496 497

  taosMemoryFree(pInfo);
498 499 500 501
}

// todo fix the bug: 2. race condition
// an fill history task needs to be started.
502
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
503
  SStreamMeta* pMeta = pTask->pMeta;
H
Haojun Liao 已提交
504
  int32_t      hTaskId = pTask->historyTaskId.taskId;
505 506

  // Set the execute conditions, including the query time window and the version range
H
Haojun Liao 已提交
507
  SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
508 509
  if (pHTask == NULL) {
    qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
H
Haojun Liao 已提交
510
          pMeta->vgId, hTaskId);
511

H
Haojun Liao 已提交
512 513 514
    SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
    pInfo->taskId = pTask->id.taskId;
    pInfo->pMeta = pTask->pMeta;
H
Haojun Liao 已提交
515

516 517 518
    if (pTask->launchTaskTimer == NULL) {
      pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask,  100, pInfo, streamEnv.timer);
      if (pTask->launchTaskTimer == NULL) {
519
        // todo failed to create timer
H
Haojun Liao 已提交
520
        taosMemoryFree(pInfo);
H
Haojun Liao 已提交
521 522
      } else {
        pTask->status.timerActive = 1;  // timer is active
523
        qDebug("s-task:%s set timer active flag", pTask->id.idStr);
524
      }
H
Haojun Liao 已提交
525 526
    } else {  // timer exists
      pTask->status.timerActive = 1;
527 528
      qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
      taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
529 530 531 532 533 534 535 536 537 538
    }

    // try again in 500ms
    return TSDB_CODE_SUCCESS;
  }

  doCheckDownstreamStatus(pTask, *pHTask);
  return TSDB_CODE_SUCCESS;
}

539 540 541 542 543 544 545 546 547 548 549 550 551
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
  SStreamMeta* pMeta = pTask->pMeta;
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

  // restore param
  int32_t code = streamRestoreParam(pTask);
  if (code < 0) {
    return -1;
  }

  // dispatch recover finish req to all related downstream task
552
  code = streamDispatchScanHistoryFinishMsg(pTask);
553 554 555 556
  if (code < 0) {
    return -1;
  }

557
  ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
558

559 560
  // ready to process data from inputQ
  streamSetStatusNormal(pTask);
561
  atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
562

563
  taosWLockLatch(&pMeta->lock);
564
  streamMetaSaveTask(pMeta, pTask);
565 566
  taosWUnLockLatch(&pMeta->lock);

567 568 569
  return 0;
}

L
liuyao 已提交
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep1Finished(exec);
}

bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverScanStep2Finished(exec);
}

int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  return qStreamRecoverSetAllStepFinished(exec);
}

585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
  SVersionRange* pRange = &pTask->dataRange.range;
  int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
  ASSERT(latestVer >= pRange->maxVer);

  int64_t nextStartVer = pRange->maxVer + 1;
  if (nextStartVer > latestVer - 1) {
    // no input data yet. no need to execute the secondardy scan while stream task halt
    streamTaskRecoverSetAllStepFinished(pTask);
    qDebug(
        "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
        pTask->id.idStr);
  } else {
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
    // [pTask->dataRange.range.maxVer, ver1]
    pRange->minVer = nextStartVer;
    pRange->maxVer = latestVer - 1;
  }
}


606
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
607
  if (tStartEncode(pEncoder) < 0) return -1;
608
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
609
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
610
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
611 612 613 614
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
615 616 617 618
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

619
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
L
Liu Jicong 已提交
620
  if (tStartDecode(pDecoder) < 0) return -1;
621
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
622
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
623
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
624 625 626 627
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
628 629 630 631
  tEndDecode(pDecoder);
  return 0;
}

632
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
633
  if (tStartEncode(pEncoder) < 0) return -1;
634
  if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
L
Liu Jicong 已提交
635
  if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
636 637 638 639 640 641
  if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
  if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
642 643 644 645
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

646
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
L
Liu Jicong 已提交
647
  if (tStartDecode(pDecoder) < 0) return -1;
648 649 650 651 652 653 654 655
  if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
L
Liu Jicong 已提交
656 657 658 659
  tEndDecode(pDecoder);
  return 0;
}

660
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
661 662 663
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
664
  if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
665 666 667
  tEndEncode(pEncoder);
  return pEncoder->pos;
}
668
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
L
Liu Jicong 已提交
669 670 671
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
672
  if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
L
Liu Jicong 已提交
673 674 675
  tEndDecode(pDecoder);
  return 0;
}
676

H
Haojun Liao 已提交
677
// todo handle race condition, this task may be destroyed
678 679 680 681 682 683 684
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
  if (pTask->info.fillHistory) {
    qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
  } else {
    // calculate the correct start time window, and start the handle the history data for the main task.
    if (pTask->historyTaskId.taskId != 0) {
      // check downstream tasks for associated scan-history-data tasks
685
      streamCheckHistoryTaskDownstream(pTask);
686 687 688

      // launch current task
      SHistDataRange* pRange = &pTask->dataRange;
689 690
      int64_t         ekey = pRange->window.ekey + 1;
      int64_t         ver = pRange->range.minVer;
691 692 693 694 695 696 697

      pRange->window.skey = ekey;
      pRange->window.ekey = INT64_MAX;
      pRange->range.minVer = 0;
      pRange->range.maxVer = ver;

      qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
698
             ", ver range:%" PRId64 " - %" PRId64,
699 700 701 702
             pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
             pRange->range.maxVer);
    } else {
      SHistDataRange* pRange = &pTask->dataRange;
703 704
      qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
             ", ver range:%" PRId64 " - %" PRId64,
705 706 707
             pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    }

708
    ASSERT(pTask->status.downstreamReady == 0);
709 710 711 712 713

    // check downstream tasks for itself
    streamTaskCheckDownstreamTasks(pTask);
  }
}