streamExec.c 19.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51
      return -1;
    }

Y
yihaoDeng 已提交
52 53
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
54 55

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
56
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
57
      destroyStreamDataBlock(pStreamBlocks);
58 59
      return -1;
    }
60 61 62

    *totalSize += size;
    *totalBlocks += numOfBlocks;
63
  } else {
64
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
65 66 67 68 69
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
70 71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
88
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
105
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
106
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
107
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
108

L
Liu Jicong 已提交
109
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
110
        block.info.type = STREAM_PULL_OVER;
111
        block.info.childId = pTask->info.selfChildId;
L
Liu Jicong 已提交
112
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
113
        numOfBlocks += 1;
114 115

        qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId,
L
Liu Jicong 已提交
116
               pRetrieveBlock->reqId);
117
      }
H
Haojun Liao 已提交
118

119 120
      break;
    }
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
129 130
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
131
    block.info.childId = pTask->info.selfChildId;
H
Haojun Liao 已提交
132

133 134 135
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
136
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
137

138
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
139
           pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
140 141

    // current output should be dispatched to down stream nodes
142 143 144
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
145 146 147 148
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
149
      pRes = NULL;
150 151
      size = 0;
      numOfBlocks = 0;
152
    }
153
  }
154

155 156 157
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
158
  } else {
Y
yihaoDeng 已提交
159
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
160
  }
161

H
Haojun Liao 已提交
162
  return code;
L
Liu Jicong 已提交
163 164
}

165
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
166
  int32_t code = 0;
167

168
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
169
  void* exec = pTask->exec.pExecutor;
170

L
Liu Jicong 已提交
171
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
172
  bool finished = false;
L
Liu Jicong 已提交
173

174 175 176 177 178 179 180 181 182
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
183
      if (streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
184
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
185 186 187
        return 0;
      }

188 189 190
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
191
        continue;
192
      }
193

L
Liu Jicong 已提交
194
      if (output == NULL) {
L
Liu Jicong 已提交
195 196 197 198
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
L
liuyao 已提交
199
          if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
            SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
            if (qRes == NULL) {
              taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
              terrno = TSDB_CODE_OUT_OF_MEMORY;
              return -1;
            }

            qRes->type = STREAM_INPUT__DATA_BLOCK;
            qRes->blocks = pRes;
            code = streamTaskOutputResultBlock(pTask, qRes);
            if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
              taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
              taosFreeQitem(qRes);
              return code;
            }
L
liuyao 已提交
215 216
            return 0;
          }
L
Liu Jicong 已提交
217
        }
L
Liu Jicong 已提交
218 219
        break;
      }
220 221 222

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
223
      block.info.childId = pTask->info.selfChildId;
224 225
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
226 227
      batchCnt++;

228
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
229 230 231
      if (batchCnt >= batchSz) {
        break;
      }
232
    }
H
Haojun Liao 已提交
233

234
    if (taosArrayGetSize(pRes) == 0) {
235 236
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
237
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
238 239
        break;
      } else {
H
Haojun Liao 已提交
240
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
241 242
        continue;
      }
243
    }
H
Haojun Liao 已提交
244

S
Shengliang Guan 已提交
245
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
246 247 248 249 250 251 252 253
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
254
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
255
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
256
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
257
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
258 259
      return code;
    }
260 261 262 263

    if (finished) {
      break;
    }
264 265 266 267 268
  }
  return 0;
}

#if 0
269 270 271 272
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
273
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
274 275 276 277 278 279 280 281 282 283
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
284
    }
285
  }
286

287 288 289 290 291
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
292

293
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
294
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
295
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
296 297
  }

298 299 300 301
  // exec impl

  // output
  // try dispatch
302 303
  return 0;
}
304
#endif
L
Liu Jicong 已提交
305

Y
yihaoDeng 已提交
306
int32_t updateCheckPointInfo(SStreamTask* pTask) {
307 308 309 310 311 312 313
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
314 315
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

335 336 337 338 339 340 341 342 343 344 345 346
static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
  // wait for the stream task to be idle
  int64_t st = taosGetTimestampMs();

  while (!streamTaskIsIdle(pStreamTask)) {
    qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
           pTask->info.taskLevel, pStreamTask->id.idStr);
    taosMsleep(100);
  }

  double el = (taosGetTimestampMs() - st) / 1000.0;
  if (el > 0) {
347
    qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr,
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
           pStreamTask->id.idStr, el);
  }
}

static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
  qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);

  // todo handle stream task is dropped here

  ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;

  // here we need to wait for the stream task handle all data in the input queue.
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
  } else {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
  }

  // wait for the stream task to be idle
  waitForTaskTobeIdle(pTask, pStreamTask);

  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    // update the scan data range for source task.
    qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
               ", status:%s, sched-status:%d",
           pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
           pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);

    // todo transfer state
  } else {
    // for sink tasks, they are continue to execute, no need to be halt.
    // the process should be stopped for a while, during the term of transfer task state.
    // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
    qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);

    // todo transfer state
  }

  // expand the query time window for stream scanner
  pTimeWindow->skey = INT64_MIN;

  streamSetStatusNormal(pStreamTask);
  streamSchedExec(pStreamTask);
  streamMetaReleaseTask(pTask->pMeta, pStreamTask);
  return TSDB_CODE_SUCCESS;
}

398 399 400 401
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
402
int32_t streamExecForAll(SStreamTask* pTask) {
403 404
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
405
  while (1) {
406
    int32_t batchSize = 1;
L
liuyao 已提交
407
    int16_t times = 0;
408

409 410
    SStreamQueueItem* pInput = NULL;

411
    // merge multiple input data if possible in the input queue.
412
    qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
H
Haojun Liao 已提交
413

L
Liu Jicong 已提交
414
    while (1) {
415
      // downstream task's input queue is blocked, stop immediately
416
      if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
H
Haojun Liao 已提交
417
          streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
418 419 420
        if (batchSize > 1) {
          break;
        } else {
421
          qDebug("123 %s", pTask->id.idStr);
L
liuyao 已提交
422 423
          return 0;
        }
L
liuyao 已提交
424
      }
425

L
Liu Jicong 已提交
426 427
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
428
        if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
L
liuyao 已提交
429
          times++;
430
          taosMsleep(10);
431
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
432 433
          continue;
        }
434

L
liuyao 已提交
435
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
436
        break;
L
Liu Jicong 已提交
437
      }
438 439 440

      if (pInput == NULL) {
        pInput = qItem;
441
        streamQueueProcessSuccess(pTask->inputQueue);
442
        if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
443
          break;
L
Liu Jicong 已提交
444
        }
L
Liu Jicong 已提交
445
      } else {
446
        // todo we need to sort the data block, instead of just appending into the array list.
447 448
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
449 450 451
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
452 453
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
454
          streamQueueProcessSuccess(pTask->inputQueue);
455

L
liuyao 已提交
456
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
457
            qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
458
                   MAX_STREAM_EXEC_BATCH_NUM);
5
54liuyao 已提交
459 460
            break;
          }
L
Liu Jicong 已提交
461
        }
L
Liu Jicong 已提交
462 463
      }
    }
464

465
    if (streamTaskShouldStop(&pTask->status)) {
466 467 468
      if (pInput) {
        streamFreeQitem(pInput);
      }
L
Liu Jicong 已提交
469
      return 0;
L
Liu Jicong 已提交
470
    }
L
Liu Jicong 已提交
471

472
    if (pInput == NULL) {
473 474
      if (pTask->info.fillHistory && pTask->status.transferState) {
        int32_t code = streamTransferStateToStreamTask(pTask);
H
Haojun Liao 已提交
475
      }
476

L
Liu Jicong 已提交
477 478 479
      break;
    }

480
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
481
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
482
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
483
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
484
      continue;
L
Liu Jicong 已提交
485
    }
L
Liu Jicong 已提交
486

487
    int64_t st = taosGetTimestampMs();
488
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
489

490 491
    {
      // set input
492
      void* pExecutor = pTask->exec.pExecutor;
493 494 495 496 497 498

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
499
        ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
500 501
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
502
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
503 504 505 506 507 508
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
509
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
510 511 512 513 514 515
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
516
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
517 518 519 520 521 522
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
523
      }
524
    }
525

526 527 528
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
529

530
    double  el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
531 532
    qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, batchSize, el, resSize / 1048576.0, totalBlocks);
533

534
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
535
  }
536

L
Liu Jicong 已提交
537
  return 0;
L
Liu Jicong 已提交
538 539
}

540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
bool streamTaskIsIdle(const SStreamTask* pTask) {
  int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
  if (numOfItems > 0) {
    return false;
  }

  numOfItems = taosQallItemSize(pTask->inputQueue->qall);
  if (numOfItems > 0) {
    return false;
  }

  // blocked by downstream task
  if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
    return false;
  }

  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
}

L
Liu Jicong 已提交
559
int32_t streamTryExec(SStreamTask* pTask) {
560
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
561
  int8_t schedStatus =
562
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
563

L
Liu Jicong 已提交
564 565 566
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
567
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
568 569
      return -1;
    }
570

571
    // todo the task should be commit here
572
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
573
    qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
574
           pTask->status.schedStatus);
L
Liu Jicong 已提交
575

dengyihao's avatar
dengyihao 已提交
576 577
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
578
      streamSchedExec(pTask);
L
Liu Jicong 已提交
579 580
    }
  }
581

L
Liu Jicong 已提交
582
  return 0;
L
Liu Jicong 已提交
583
}
L
liuyao 已提交
584 585

int32_t streamTaskReleaseState(SStreamTask* pTask) {
H
Haojun Liao 已提交
586 587 588 589 590 591 592
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReleaseState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
593 594 595
}

int32_t streamTaskReloadState(SStreamTask* pTask) {
H
Haojun Liao 已提交
596 597 598 599 600 601 602
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReloadState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
603
}