streamExec.c 27.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
25

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
33
  return (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT);
L
liuyao 已提交
34 35
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
46
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
47
    if (pStreamBlocks == NULL) {
48
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
49
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
50 51 52
      return -1;
    }

Y
yihaoDeng 已提交
53 54
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
55 56

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
57
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
58
      destroyStreamDataBlock(pStreamBlocks);
59 60
      return -1;
    }
61 62 63

    *totalSize += size;
    *totalBlocks += numOfBlocks;
64
  } else {
65
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
66 67 68 69 70
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
71 72
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
73 74
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
75

76 77 78 79 80
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
81
  SArray* pRes = NULL;
L
Liu Jicong 已提交
82 83

  while (1) {
H
Haojun Liao 已提交
84 85 86
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
87

88
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
89
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
90 91 92
      return 0;
    }

L
Liu Jicong 已提交
93 94
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
95
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
96
      if (code == TSDB_CODE_QRY_IN_EXEC) {
97
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
98
      }
99 100

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
101
      continue;
L
Liu Jicong 已提交
102
    }
103

104
    if (output == NULL) {
5
54liuyao 已提交
105
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
106
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
107
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
108
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
109

L
Liu Jicong 已提交
110
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
111
        block.info.type = STREAM_PULL_OVER;
112
        block.info.childId = pTask->info.selfChildId;
L
Liu Jicong 已提交
113
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
114
        numOfBlocks += 1;
115 116

        qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId,
L
Liu Jicong 已提交
117
               pRetrieveBlock->reqId);
118
      }
H
Haojun Liao 已提交
119

120 121
      break;
    }
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
130 131
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
132
    block.info.childId = pTask->info.selfChildId;
H
Haojun Liao 已提交
133

134 135 136
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
137
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
138

139
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
140
           pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
141 142

    // current output should be dispatched to down stream nodes
143 144 145
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
146 147 148 149
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
150
      pRes = NULL;
151 152
      size = 0;
      numOfBlocks = 0;
153
    }
154
  }
155

156 157 158
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
159
  } else {
Y
yihaoDeng 已提交
160
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
161
  }
162

H
Haojun Liao 已提交
163
  return code;
L
Liu Jicong 已提交
164 165
}

166
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
167
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
168 169 170
  int32_t code = TSDB_CODE_SUCCESS;
  void*   exec = pTask->exec.pExecutor;
  bool    finished = false;
171

L
Liu Jicong 已提交
172 173
  qSetStreamOpOpen(exec);

174
  while (!finished) {
175 176 177
    if (streamTaskShouldPause(&pTask->status)) {
      double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
      qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
178
      break;
179 180
    }

181 182 183 184 185 186
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

187
    int32_t numOfBlocks = 0;
188
    while (1) {
L
liuyao 已提交
189
      if (streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
190
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
191 192 193
        return 0;
      }

194 195
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
196 197 198
      code = qExecTask(exec, &output, &ts);
      if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
        qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
5
54liuyao 已提交
199
        continue;
200
      }
201

202
      // the generated results before fill-history task been paused, should be dispatched to sink node
203 204
      if (output == NULL) {
        finished = qStreamRecoverScanFinished(exec);
L
Liu Jicong 已提交
205 206
        break;
      }
207 208 209

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
210
      block.info.childId = pTask->info.selfChildId;
211 212
      taosArrayPush(pRes, &block);

H
Haojun Liao 已提交
213 214
      if ((++numOfBlocks) >= batchSize) {
        qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
H
Haojun Liao 已提交
215 216
        break;
      }
217
    }
H
Haojun Liao 已提交
218

219 220 221 222 223 224
    if (taosArrayGetSize(pRes) > 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
      if (qRes == NULL) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
225
      }
226

227 228
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
229

230 231 232 233 234 235
      code = streamTaskOutputResultBlock(pTask, qRes);
      if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
        return code;
      }
236
    } else {
237
      taosArrayDestroy(pRes);
238
    }
239
  }
240 241 242

  return 0;
}
L
Liu Jicong 已提交
243

Y
yihaoDeng 已提交
244
int32_t updateCheckPointInfo(SStreamTask* pTask) {
245 246 247 248 249 250 251
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
252 253
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

273
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
274 275 276 277 278 279 280 281 282 283 284
  // wait for the stream task to be idle
  int64_t st = taosGetTimestampMs();

  while (!streamTaskIsIdle(pStreamTask)) {
    qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
           pTask->info.taskLevel, pStreamTask->id.idStr);
    taosMsleep(100);
  }

  double el = (taosGetTimestampMs() - st) / 1000.0;
  if (el > 0) {
285
    qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr,
286 287 288 289
           pStreamTask->id.idStr, el);
  }
}

290
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
291 292
  SStreamMeta* pMeta = pTask->pMeta;

293
  SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
294
  if (pStreamTask == NULL) {
295 296 297 298 299 300 301 302 303 304 305 306 307 308
    qError(
        "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
        "fill-history task",
        pTask->id.idStr, pTask->streamTaskId.taskId);

    // 1. free it and remove fill-history task from disk meta-store
    streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);

    // 2. save to disk
    taosWLockLatch(&pMeta->lock);
    if (streamMetaCommit(pMeta) < 0) {
      // persist to disk
    }
    taosWUnLockLatch(&pMeta->lock);
309 310
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
  } else {
H
Haojun Liao 已提交
311 312
    qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
           pStreamTask->id.idStr);
313
  }
314

315
  ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
316

317 318
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;

H
Haojun Liao 已提交
319
  // todo. the dropping status should be append to the status after the halt completed.
320
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
H
Haojun Liao 已提交
321
  // for the step 2.
322
  int8_t status = pStreamTask->status.taskStatus;
323
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
H
Haojun Liao 已提交
324
    ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
325
  } else {
326
    ASSERT(status == TASK_STATUS__SCAN_HISTORY);
327
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
328
    qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
329 330
  }

331
  // wait for the stream task to handle all in the inputQ, and to be idle
332
  waitForTaskIdle(pTask, pStreamTask);
333

334
  // In case of sink tasks, no need to halt them.
335 336 337
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
  // start the task state transfer procedure.
  // When a task is idle with halt status, all data in inputQ are consumed.
338 339
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    // update the scan data range for source task.
340
    qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
341
           ", status:%s, sched-status:%d",
342 343 344
           pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
           pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
  } else {
345
    qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
346 347
  }

348
  // 1. expand the query time window for stream task of WAL scanner
349
  pTimeWindow->skey = INT64_MIN;
350
  qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
351

352
  // 2. transfer the ownership of executor state
353 354 355
  streamTaskReleaseState(pTask);
  streamTaskReloadState(pStreamTask);

356
  // 3. clear the link between fill-history task and stream task info
357
  pStreamTask->historyTaskId.taskId = 0;
358 359 360

  // 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
  // pause, since the pause allowed attribute is not set yet.
361
  streamTaskResumeFromHalt(pStreamTask);
362

363
  qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
364

365
  // 5. free it and remove fill-history task from disk meta-store
H
Haojun Liao 已提交
366
  streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
367

368
  // 6. save to disk
369 370 371 372 373 374 375
  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pStreamTask);
  if (streamMetaCommit(pMeta) < 0) {
    // persist to disk
  }
  taosWUnLockLatch(&pMeta->lock);

376
  // 7. pause allowed.
377
  streamTaskEnablePause(pStreamTask);
L
liuyao 已提交
378
  if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
L
liuyao 已提交
379 380 381 382 383 384
    SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
    SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
    pDelBlock->info.rows = 0;
    pDelBlock->info.version = 0;
    pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
    pItem->pBlock = pDelBlock;
L
liuyao 已提交
385 386
    int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem);
    qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
L
liuyao 已提交
387
  }
388

389
  streamSchedExec(pStreamTask);
390
  streamMetaReleaseTask(pMeta, pStreamTask);
391 392 393
  return TSDB_CODE_SUCCESS;
}

394
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
395
  int32_t code = TSDB_CODE_SUCCESS;
396
  ASSERT(pTask->status.appendTranstateBlock == 1);
397

398 399
  int32_t level = pTask->info.taskLevel;
  if (level == TASK_LEVEL__SOURCE) {
400
    streamTaskFillHistoryFinished(pTask);
401 402 403
  }

  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {  // do transfer task operator states.
H
Haojun Liao 已提交
404
    code = streamDoTransferStateToStreamTask(pTask);
405 406 407 408 409
  }

  return code;
}

H
Haojun Liao 已提交
410 411 412 413
static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
  int32_t     retryTimes = 0;
  int32_t     MAX_RETRY_TIMES = 5;
  const char* id = pTask->id.idStr;
414

H
Haojun Liao 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {  // extract block from inputQ, one-by-one
    while (1) {
      if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
        qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
        return TSDB_CODE_SUCCESS;
      }

      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
        if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
          taosMsleep(10);
          qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
          continue;
        }

        qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
        return TSDB_CODE_SUCCESS;
      }

      qDebug("s-task:%s sink task handle result block one-by-one", id);
      *numOfBlocks = 1;
      *pInput = qItem;
      return TSDB_CODE_SUCCESS;
    }
  }

  // non sink task
L
Liu Jicong 已提交
442
  while (1) {
H
Haojun Liao 已提交
443 444
    if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
      qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
445 446
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
447

448 449
    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
H
Haojun Liao 已提交
450
      if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
451
        taosMsleep(10);
H
Haojun Liao 已提交
452
        qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
453
        continue;
L
liuyao 已提交
454
      }
455

H
Haojun Liao 已提交
456
      qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
457 458
      return TSDB_CODE_SUCCESS;
    }
459

H
Haojun Liao 已提交
460 461 462 463 464 465 466 467 468 469 470 471
    // do not merge blocks for sink node and check point data block
    if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
        qItem->type == STREAM_INPUT__TRANS_STATE) {
      if (*pInput == NULL) {
        qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
        *numOfBlocks = 1;
        *pInput = qItem;
        return TSDB_CODE_SUCCESS;
      } else {
        // previous existed blocks needs to be handle, before handle the checkpoint msg block
        qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
               *numOfBlocks);
472 473
        streamQueueProcessFail(pTask->inputQueue);
        return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
474
      }
H
Haojun Liao 已提交
475 476 477 478 479 480 481 482 483 484 485 486
    } else {
      if (*pInput == NULL) {
        ASSERT((*numOfBlocks) == 0);
        *pInput = qItem;
      } else {
        // todo we need to sort the data block, instead of just appending into the array list.
        void* newRet = streamMergeQueueItem(*pInput, qItem);
        if (newRet == NULL) {
          qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
          streamQueueProcessFail(pTask->inputQueue);
          return TSDB_CODE_SUCCESS;
        }
487

H
Haojun Liao 已提交
488 489
        *pInput = newRet;
      }
490

H
Haojun Liao 已提交
491 492
      *numOfBlocks += 1;
      streamQueueProcessSuccess(pTask->inputQueue);
493

H
Haojun Liao 已提交
494 495 496 497
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
        qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
        return TSDB_CODE_SUCCESS;
      }
L
Liu Jicong 已提交
498
    }
499 500 501
  }
}

502 503 504 505 506 507 508 509
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  const char* id = pTask->id.idStr;
  int32_t     code = TSDB_CODE_SUCCESS;

  int32_t level = pTask->info.taskLevel;
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
    int32_t remain = streamAlignTransferState(pTask);
    if (remain > 0) {
H
Haojun Liao 已提交
510
      streamFreeQitem((SStreamQueueItem*)pBlock);
511 512 513
      qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
      return 0;
    }
H
Haojun Liao 已提交
514
  }
515

H
Haojun Liao 已提交
516 517
  // dispatch the tran-state block to downstream task immediately
  int32_t type = pTask->outputInfo.type;
518 519 520 521 522

  // transfer the ownership of executor state
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    if (level == TASK_LEVEL__SOURCE) {
      qDebug("s-task:%s add transfer-state block into outputQ", id);
523
    } else {
524 525 526 527
      qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
      ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
    }

528
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
529 530 531 532 533 534 535 536 537 538 539
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
      pBlock->srcVgId = pTask->pMeta->vgId;
      code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
      if (code == 0) {
        streamDispatchStreamBlock(pTask);
      } else {
        streamFreeQitem((SStreamQueueItem*)pBlock);
      }
    }
  } else { // non-dispatch task, do task state transfer directly
    qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
H
Haojun Liao 已提交
540 541

    streamFreeQitem((SStreamQueueItem*)pBlock);
542 543 544 545 546
    ASSERT(pTask->info.fillHistory == 1);
    code = streamTransferStateToStreamTask(pTask);

    if (code != TSDB_CODE_SUCCESS) {
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
547 548 549 550 551 552
    }
  }

  return code;
}

553 554 555 556
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
557
int32_t streamExecForAll(SStreamTask* pTask) {
558 559
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
560
  while (1) {
561
    int32_t batchSize = 0;
562
    SStreamQueueItem* pInput = NULL;
563 564 565 566
    if (streamTaskShouldStop(&pTask->status)) {
      qDebug("s-task:%s stream task stopped, abort", id);
      break;
    }
567

568
    // merge multiple input data if possible in the input queue.
569
    qDebug("s-task:%s start to extract data block from inputQ", id);
L
Liu Jicong 已提交
570

H
Haojun Liao 已提交
571
    /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize);
572
    if (pInput == NULL) {
573
      ASSERT(batchSize == 0);
L
Liu Jicong 已提交
574 575 576
      break;
    }

577 578 579 580 581
    if (pInput->type == STREAM_INPUT__TRANS_STATE) {
      streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
      return 0;
    }

582
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
583
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
584
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
585
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
586
      continue;
L
Liu Jicong 已提交
587
    }
L
Liu Jicong 已提交
588

589
    int64_t st = taosGetTimestampMs();
590
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
591

592 593
    {
      // set input
594
      void* pExecutor = pTask->exec.pExecutor;
595 596 597 598 599 600

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
601
        ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
602 603
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
604
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
605 606 607 608 609 610
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
611
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
612 613 614 615 616 617
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
618
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
619 620 621 622 623 624
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
625
      }
626
    }
627

628 629 630
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
631

632
    double  el = (taosGetTimestampMs() - st) / 1000.0;
633 634
    qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, el, resSize / 1048576.0, totalBlocks);
635

636
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
637
  }
638

L
Liu Jicong 已提交
639
  return 0;
L
Liu Jicong 已提交
640 641
}

642 643
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
644
bool streamTaskIsIdle(const SStreamTask* pTask) {
645 646
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP ||
          pTask->status.taskStatus == TASK_STATUS__DROPPING);
647 648
}

649 650 651 652 653
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
  const char* id = pTask->id.idStr;
  double      el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
  qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);

654
  // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
655
  appendTranstateIntoInputQ(pTask);
656 657 658
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
659
int32_t streamTryExec(SStreamTask* pTask) {
660
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
661
  int8_t schedStatus =
662
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
663

H
Haojun Liao 已提交
664 665
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
666 667
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
668
    if (code < 0) {  // todo this status shoudl be removed
669
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
670 671
      return -1;
    }
672

673
    // todo the task should be commit here
674
//    if (taosQueueEmpty(pTask->inputQueue->queue)) {
675
      // fill-history WAL scan has completed
676 677 678 679 680 681
//      if (pTask->status.transferState) {
//        code = streamTransferStateToStreamTask(pTask);
//        if (code != TSDB_CODE_SUCCESS) {
//          atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
//          return code;
//        }
682 683 684

        // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
        // call this function (streamExecForAll) directly.
685 686 687 688 689 690 691 692 693 694
        //        code = streamExecForAll(pTask);
        //        if (code < 0) {
        // do nothing
        //        }
//      }

//      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
//      qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
//             streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
//    } else {
H
Haojun Liao 已提交
695 696 697 698
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
      qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
             pTask->status.schedStatus);

699 700
      if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
            streamTaskShouldPause(&pTask->status))) {
H
Haojun Liao 已提交
701 702
        streamSchedExec(pTask);
      }
703
//    }
704
  } else {
H
Haojun Liao 已提交
705
    qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
706
           streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
L
Liu Jicong 已提交
707
  }
708

L
Liu Jicong 已提交
709
  return 0;
L
Liu Jicong 已提交
710
}
L
liuyao 已提交
711 712

int32_t streamTaskReleaseState(SStreamTask* pTask) {
713
  qDebug("s-task:%s release exec state", pTask->id.idStr);
H
Haojun Liao 已提交
714 715 716 717 718 719 720
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReleaseState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
721 722 723
}

int32_t streamTaskReloadState(SStreamTask* pTask) {
724
  qDebug("s-task:%s reload exec state", pTask->id.idStr);
H
Haojun Liao 已提交
725 726 727 728 729 730 731
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReloadState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
732
}
H
Haojun Liao 已提交
733 734 735 736 737 738 739 740 741 742

int32_t streamAlignTransferState(SStreamTask* pTask) {
  int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
  if (old == 0) {
    qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
  }

  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
}