streamExec.c 26.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
25

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
33
  return (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT);
L
liuyao 已提交
34 35
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
46
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
47
    if (pStreamBlocks == NULL) {
48
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
49
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
50 51 52
      return -1;
    }

Y
yihaoDeng 已提交
53 54
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
55 56

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
57
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
58
      destroyStreamDataBlock(pStreamBlocks);
59 60
      return -1;
    }
61 62 63

    *totalSize += size;
    *totalBlocks += numOfBlocks;
64
  } else {
65
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
66 67 68 69 70
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
71 72
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
73 74
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
75

76 77 78 79 80
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
81
  SArray* pRes = NULL;
L
Liu Jicong 已提交
82 83

  while (1) {
H
Haojun Liao 已提交
84 85 86
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
87

88
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
89
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
90 91 92
      return 0;
    }

L
Liu Jicong 已提交
93 94
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
95
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
96
      if (code == TSDB_CODE_QRY_IN_EXEC) {
97
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
98
      }
99 100

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
101
      continue;
L
Liu Jicong 已提交
102
    }
103

104
    if (output == NULL) {
5
54liuyao 已提交
105
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
106
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
107
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
108
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
109

L
Liu Jicong 已提交
110
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
111
        block.info.type = STREAM_PULL_OVER;
112
        block.info.childId = pTask->info.selfChildId;
L
Liu Jicong 已提交
113
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
114
        numOfBlocks += 1;
115 116

        qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId,
L
Liu Jicong 已提交
117
               pRetrieveBlock->reqId);
118
      }
H
Haojun Liao 已提交
119

120 121
      break;
    }
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
130 131
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
132
    block.info.childId = pTask->info.selfChildId;
H
Haojun Liao 已提交
133

134 135 136
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
137
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
138

139
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
140
           pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
141 142

    // current output should be dispatched to down stream nodes
143 144 145
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
146 147 148 149
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
150
      pRes = NULL;
151 152
      size = 0;
      numOfBlocks = 0;
153
    }
154
  }
155

156 157 158
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
159
  } else {
Y
yihaoDeng 已提交
160
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
161
  }
162

H
Haojun Liao 已提交
163
  return code;
L
Liu Jicong 已提交
164 165
}

166
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
167
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
168 169 170
  int32_t code = TSDB_CODE_SUCCESS;
  void*   exec = pTask->exec.pExecutor;
  bool    finished = false;
171

L
Liu Jicong 已提交
172 173
  qSetStreamOpOpen(exec);

174
  while (!finished) {
175 176 177
    if (streamTaskShouldPause(&pTask->status)) {
      double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
      qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
178
      break;
179 180
    }

181 182 183 184 185 186
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

187
    int32_t numOfBlocks = 0;
188
    while (1) {
L
liuyao 已提交
189
      if (streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
190
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
191 192 193
        return 0;
      }

194 195
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
196 197 198
      code = qExecTask(exec, &output, &ts);
      if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
        qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
5
54liuyao 已提交
199
        continue;
200
      }
201

202
      // the generated results before fill-history task been paused, should be dispatched to sink node
203 204
      if (output == NULL) {
        finished = qStreamRecoverScanFinished(exec);
L
Liu Jicong 已提交
205 206
        break;
      }
207 208 209

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
210
      block.info.childId = pTask->info.selfChildId;
211 212
      taosArrayPush(pRes, &block);

H
Haojun Liao 已提交
213 214
      if ((++numOfBlocks) >= batchSize) {
        qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
H
Haojun Liao 已提交
215 216
        break;
      }
217
    }
H
Haojun Liao 已提交
218

219 220 221 222 223 224
    if (taosArrayGetSize(pRes) > 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
      if (qRes == NULL) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
225
      }
226

227 228
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
229

230 231 232 233 234 235
      code = streamTaskOutputResultBlock(pTask, qRes);
      if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
        return code;
      }
236
    } else {
237
      taosArrayDestroy(pRes);
238
    }
239
  }
240 241 242

  return 0;
}
L
Liu Jicong 已提交
243

Y
yihaoDeng 已提交
244
int32_t updateCheckPointInfo(SStreamTask* pTask) {
245 246 247 248 249 250 251
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
252 253
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

273
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
274 275 276 277 278 279 280 281 282 283 284
  // wait for the stream task to be idle
  int64_t st = taosGetTimestampMs();

  while (!streamTaskIsIdle(pStreamTask)) {
    qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
           pTask->info.taskLevel, pStreamTask->id.idStr);
    taosMsleep(100);
  }

  double el = (taosGetTimestampMs() - st) / 1000.0;
  if (el > 0) {
285
    qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr,
286 287 288 289
           pStreamTask->id.idStr, el);
  }
}

290
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
291 292
  SStreamMeta* pMeta = pTask->pMeta;

293
  SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
294
  if (pStreamTask == NULL) {
295 296 297 298 299 300 301 302 303 304 305 306 307 308
    qError(
        "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
        "fill-history task",
        pTask->id.idStr, pTask->streamTaskId.taskId);

    // 1. free it and remove fill-history task from disk meta-store
    streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);

    // 2. save to disk
    taosWLockLatch(&pMeta->lock);
    if (streamMetaCommit(pMeta) < 0) {
      // persist to disk
    }
    taosWUnLockLatch(&pMeta->lock);
309 310
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
  } else {
H
Haojun Liao 已提交
311 312
    qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
           pStreamTask->id.idStr);
313
  }
314

315
  ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
316

317 318
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;

H
Haojun Liao 已提交
319
  // todo. the dropping status should be append to the status after the halt completed.
320
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
H
Haojun Liao 已提交
321
  // for the step 2.
322
  int8_t status = pStreamTask->status.taskStatus;
323
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
H
Haojun Liao 已提交
324
    ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
325
  } else {
326
    ASSERT(status == TASK_STATUS__SCAN_HISTORY);
327
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
328
    qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
329 330
  }

331
  // wait for the stream task to handle all in the inputQ, and to be idle
332
  waitForTaskIdle(pTask, pStreamTask);
333

334
  // In case of sink tasks, no need to halt them.
335 336 337
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
  // start the task state transfer procedure.
  // When a task is idle with halt status, all data in inputQ are consumed.
338 339
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    // update the scan data range for source task.
340
    qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
341
           ", status:%s, sched-status:%d",
342 343 344
           pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
           pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
  } else {
345
    qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
346 347
  }

348
  // 1. expand the query time window for stream task of WAL scanner
349
  pTimeWindow->skey = INT64_MIN;
350
  qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
351

352
  // 2. transfer the ownership of executor state
353 354 355
  streamTaskReleaseState(pTask);
  streamTaskReloadState(pStreamTask);

356
  // 3. clear the link between fill-history task and stream task info
357
  pStreamTask->historyTaskId.taskId = 0;
358 359 360

  // 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
  // pause, since the pause allowed attribute is not set yet.
361
  streamTaskResumeFromHalt(pStreamTask);
362

363
  qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
364

365
  // 5. free it and remove fill-history task from disk meta-store
H
Haojun Liao 已提交
366
  streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
367

368
  // 6. save to disk
369 370 371 372 373 374 375
  taosWLockLatch(&pMeta->lock);
  streamMetaSaveTask(pMeta, pStreamTask);
  if (streamMetaCommit(pMeta) < 0) {
    // persist to disk
  }
  taosWUnLockLatch(&pMeta->lock);

376
  // 7. pause allowed.
377
  streamTaskEnablePause(pStreamTask);
L
liuyao 已提交
378
  if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
L
liuyao 已提交
379 380 381 382 383 384
    SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
    SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
    pDelBlock->info.rows = 0;
    pDelBlock->info.version = 0;
    pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
    pItem->pBlock = pDelBlock;
L
liuyao 已提交
385 386
    int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem);
    qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
L
liuyao 已提交
387
  }
388

389
  streamSchedExec(pStreamTask);
390
  streamMetaReleaseTask(pMeta, pStreamTask);
391 392 393
  return TSDB_CODE_SUCCESS;
}

394
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
395
  int32_t code = TSDB_CODE_SUCCESS;
396
  ASSERT(pTask->status.appendTranstateBlock == 1);
397

398 399
  int32_t level = pTask->info.taskLevel;
  if (level == TASK_LEVEL__SOURCE) {
400
    streamTaskFillHistoryFinished(pTask);
401 402 403
  }

  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {  // do transfer task operator states.
H
Haojun Liao 已提交
404
    code = streamDoTransferStateToStreamTask(pTask);
405 406 407 408 409
  }

  return code;
}

H
Haojun Liao 已提交
410 411 412 413
static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
  int32_t     retryTimes = 0;
  int32_t     MAX_RETRY_TIMES = 5;
  const char* id = pTask->id.idStr;
414

H
Haojun Liao 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {  // extract block from inputQ, one-by-one
    while (1) {
      if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
        qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
        return TSDB_CODE_SUCCESS;
      }

      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
        if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
          taosMsleep(10);
          qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
          continue;
        }

        qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
        return TSDB_CODE_SUCCESS;
      }

      qDebug("s-task:%s sink task handle result block one-by-one", id);
      *numOfBlocks = 1;
      *pInput = qItem;
      return TSDB_CODE_SUCCESS;
    }
  }

  // non sink task
L
Liu Jicong 已提交
442
  while (1) {
H
Haojun Liao 已提交
443 444
    if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
      qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
445 446
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
447

448 449
    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
H
Haojun Liao 已提交
450
      if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
451
        taosMsleep(10);
H
Haojun Liao 已提交
452
        qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
453
        continue;
L
liuyao 已提交
454
      }
455

H
Haojun Liao 已提交
456
      qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
457 458
      return TSDB_CODE_SUCCESS;
    }
459

H
Haojun Liao 已提交
460 461 462 463 464 465 466 467 468 469 470 471
    // do not merge blocks for sink node and check point data block
    if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
        qItem->type == STREAM_INPUT__TRANS_STATE) {
      if (*pInput == NULL) {
        qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
        *numOfBlocks = 1;
        *pInput = qItem;
        return TSDB_CODE_SUCCESS;
      } else {
        // previous existed blocks needs to be handle, before handle the checkpoint msg block
        qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
               *numOfBlocks);
472 473
        streamQueueProcessFail(pTask->inputQueue);
        return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
474
      }
H
Haojun Liao 已提交
475 476 477 478 479 480 481 482 483 484 485 486
    } else {
      if (*pInput == NULL) {
        ASSERT((*numOfBlocks) == 0);
        *pInput = qItem;
      } else {
        // todo we need to sort the data block, instead of just appending into the array list.
        void* newRet = streamMergeQueueItem(*pInput, qItem);
        if (newRet == NULL) {
          qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
          streamQueueProcessFail(pTask->inputQueue);
          return TSDB_CODE_SUCCESS;
        }
487

H
Haojun Liao 已提交
488 489
        *pInput = newRet;
      }
490

H
Haojun Liao 已提交
491 492
      *numOfBlocks += 1;
      streamQueueProcessSuccess(pTask->inputQueue);
493

H
Haojun Liao 已提交
494 495 496 497
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
        qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
        return TSDB_CODE_SUCCESS;
      }
L
Liu Jicong 已提交
498
    }
499 500 501
  }
}

502 503 504 505 506 507 508 509
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  const char* id = pTask->id.idStr;
  int32_t     code = TSDB_CODE_SUCCESS;

  int32_t level = pTask->info.taskLevel;
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
    int32_t remain = streamAlignTransferState(pTask);
    if (remain > 0) {
H
Haojun Liao 已提交
510
      streamFreeQitem((SStreamQueueItem*)pBlock);
511 512 513
      qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
      return 0;
    }
H
Haojun Liao 已提交
514
  }
515

H
Haojun Liao 已提交
516 517
  // dispatch the tran-state block to downstream task immediately
  int32_t type = pTask->outputInfo.type;
518 519 520 521 522

  // transfer the ownership of executor state
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    if (level == TASK_LEVEL__SOURCE) {
      qDebug("s-task:%s add transfer-state block into outputQ", id);
523
    } else {
524 525 526 527
      qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
      ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
    }

528
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
529 530 531 532 533 534 535 536
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
      pBlock->srcVgId = pTask->pMeta->vgId;
      code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
      if (code == 0) {
        streamDispatchStreamBlock(pTask);
      } else {
        streamFreeQitem((SStreamQueueItem*)pBlock);
      }
H
Haojun Liao 已提交
537 538
    } else {  // level == TASK_LEVEL__SINK
      streamFreeQitem((SStreamQueueItem*)pBlock);
539 540
    }
  } else { // non-dispatch task, do task state transfer directly
H
Haojun Liao 已提交
541
    streamFreeQitem((SStreamQueueItem*)pBlock);
H
Haojun Liao 已提交
542 543 544 545
    if (level != TASK_LEVEL__SINK) {
      qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
      ASSERT(pTask->info.fillHistory == 1);
      code = streamTransferStateToStreamTask(pTask);
546

H
Haojun Liao 已提交
547 548 549 550
      if (code != TSDB_CODE_SUCCESS) {
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
      }
    } else {
H
Haojun Liao 已提交
551
      qDebug("s-task:%s sink task does not transfer state", id);
552 553 554 555 556 557
    }
  }

  return code;
}

558 559 560 561
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
562
int32_t streamExecForAll(SStreamTask* pTask) {
563 564
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
565
  while (1) {
566
    int32_t batchSize = 0;
567
    SStreamQueueItem* pInput = NULL;
568 569 570 571
    if (streamTaskShouldStop(&pTask->status)) {
      qDebug("s-task:%s stream task stopped, abort", id);
      break;
    }
572

573
    // merge multiple input data if possible in the input queue.
574
    qDebug("s-task:%s start to extract data block from inputQ", id);
L
Liu Jicong 已提交
575

H
Haojun Liao 已提交
576
    /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize);
577
    if (pInput == NULL) {
578
      ASSERT(batchSize == 0);
L
Liu Jicong 已提交
579 580 581
      break;
    }

582 583 584 585 586
    if (pInput->type == STREAM_INPUT__TRANS_STATE) {
      streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
      return 0;
    }

587
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
588
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
589
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
590
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
591
      continue;
L
Liu Jicong 已提交
592
    }
L
Liu Jicong 已提交
593

594
    int64_t st = taosGetTimestampMs();
595
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
596

597 598
    {
      // set input
599
      void* pExecutor = pTask->exec.pExecutor;
600 601 602 603 604 605

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
606
        ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
607 608
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
609
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
610 611 612 613 614 615
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
616
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
617 618 619 620 621 622
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
623
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
624 625 626 627 628 629
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
630
      }
631
    }
632

633 634 635
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
636

637
    double  el = (taosGetTimestampMs() - st) / 1000.0;
638 639
    qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, el, resSize / 1048576.0, totalBlocks);
640

641
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
642
  }
643

L
Liu Jicong 已提交
644
  return 0;
L
Liu Jicong 已提交
645 646
}

647 648
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
649
bool streamTaskIsIdle(const SStreamTask* pTask) {
650 651
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP ||
          pTask->status.taskStatus == TASK_STATUS__DROPPING);
652 653
}

L
Liu Jicong 已提交
654
int32_t streamTryExec(SStreamTask* pTask) {
655
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
656
  int8_t schedStatus =
657
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
658

H
Haojun Liao 已提交
659 660
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
661 662
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
663
    if (code < 0) {  // todo this status shoudl be removed
664
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
665 666
      return -1;
    }
667

668
    // todo the task should be commit here
669 670 671
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
    qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
           pTask->status.schedStatus);
H
Haojun Liao 已提交
672

673 674 675 676
    if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
          streamTaskShouldPause(&pTask->status))) {
      streamSchedExec(pTask);
    }
677
  } else {
H
Haojun Liao 已提交
678
    qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
679
           streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
L
Liu Jicong 已提交
680
  }
681

L
Liu Jicong 已提交
682
  return 0;
L
Liu Jicong 已提交
683
}
L
liuyao 已提交
684 685

int32_t streamTaskReleaseState(SStreamTask* pTask) {
686
  qDebug("s-task:%s release exec state", pTask->id.idStr);
H
Haojun Liao 已提交
687 688 689 690 691 692 693
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReleaseState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
694 695 696
}

int32_t streamTaskReloadState(SStreamTask* pTask) {
697
  qDebug("s-task:%s reload exec state", pTask->id.idStr);
H
Haojun Liao 已提交
698 699 700 701 702 703 704
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReloadState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
705
}
H
Haojun Liao 已提交
706 707 708 709 710 711 712 713 714 715

int32_t streamAlignTransferState(SStreamTask* pTask) {
  int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
  if (old == 0) {
    qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
  }

  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
}