streamExec.c 19.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51
      return -1;
    }

Y
yihaoDeng 已提交
52 53
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
54 55

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
56
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
57
      destroyStreamDataBlock(pStreamBlocks);
58 59
      return -1;
    }
60 61 62

    *totalSize += size;
    *totalBlocks += numOfBlocks;
63
  } else {
64
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
65 66 67 68 69
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
70 71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
88
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
105
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
106
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
107
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
108

L
Liu Jicong 已提交
109
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
110
        block.info.type = STREAM_PULL_OVER;
111
        block.info.childId = pTask->info.selfChildId;
L
Liu Jicong 已提交
112
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
113
        numOfBlocks += 1;
114
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
L
Liu Jicong 已提交
115
               pRetrieveBlock->reqId);
116
      }
H
Haojun Liao 已提交
117

118 119
      break;
    }
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
128 129
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
130
    block.info.childId = pTask->info.selfChildId;
H
Haojun Liao 已提交
131

132 133 134
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
135
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
136

137
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
138
           pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
139 140

    // current output should be dispatched to down stream nodes
141 142 143
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
144 145 146 147
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
148
      pRes = NULL;
149 150
      size = 0;
      numOfBlocks = 0;
151
    }
152
  }
153

154 155 156
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
157
  } else {
Y
yihaoDeng 已提交
158
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
159
  }
160

H
Haojun Liao 已提交
161
  return code;
L
Liu Jicong 已提交
162 163
}

164
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
165
  int32_t code = 0;
166

167
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
168
  void* exec = pTask->exec.pExecutor;
169

L
Liu Jicong 已提交
170
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
171
  bool finished = false;
L
Liu Jicong 已提交
172

173 174 175 176 177 178 179 180 181
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
182
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
183
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
184 185 186
        return 0;
      }

187 188 189
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
190
        continue;
191
      }
192

L
Liu Jicong 已提交
193
      if (output == NULL) {
L
Liu Jicong 已提交
194 195 196 197 198
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
199 200
        break;
      }
201 202 203

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
204
      block.info.childId = pTask->info.selfChildId;
205 206
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
207 208
      batchCnt++;

209
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
210 211 212
      if (batchCnt >= batchSz) {
        break;
      }
213
    }
H
Haojun Liao 已提交
214

215
    if (taosArrayGetSize(pRes) == 0) {
216 217
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
218
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
219 220
        break;
      } else {
H
Haojun Liao 已提交
221
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
222 223
        continue;
      }
224
    }
H
Haojun Liao 已提交
225

S
Shengliang Guan 已提交
226
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
227 228 229 230 231 232 233 234
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
235
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
236
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
237
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
238
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
239 240
      return code;
    }
241 242 243 244

    if (finished) {
      break;
    }
245 246 247 248 249
  }
  return 0;
}

#if 0
250 251 252 253
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
254
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
255 256 257 258 259 260 261 262 263 264
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
265
    }
266
  }
267

268 269 270 271 272
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
273

274
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
275
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
276
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
277 278
  }

279 280 281 282
  // exec impl

  // output
  // try dispatch
283 284
  return 0;
}
285
#endif
L
Liu Jicong 已提交
286

Y
yihaoDeng 已提交
287
int32_t updateCheckPointInfo(SStreamTask* pTask) {
288 289 290 291 292 293 294
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
295 296
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
  // wait for the stream task to be idle
  int64_t st = taosGetTimestampMs();

  while (!streamTaskIsIdle(pStreamTask)) {
    qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
           pTask->info.taskLevel, pStreamTask->id.idStr);
    taosMsleep(100);
  }

  double el = (taosGetTimestampMs() - st) / 1000.0;
  if (el > 0) {
    qDebug("s-task:%s wait for stream task:%s for %.2fs to execute all data in inputQ", pTask->id.idStr,
           pStreamTask->id.idStr, el);
  }
}

static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
  qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);

  // todo handle stream task is dropped here

  ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;

  // here we need to wait for the stream task handle all data in the input queue.
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
  } else {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
  }

  // wait for the stream task to be idle
  waitForTaskTobeIdle(pTask, pStreamTask);

  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    // update the scan data range for source task.
    qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
               ", status:%s, sched-status:%d",
           pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
           pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);

    // todo transfer state
  } else {
    // for sink tasks, they are continue to execute, no need to be halt.
    // the process should be stopped for a while, during the term of transfer task state.
    // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
    qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);

    // todo transfer state
  }

  // expand the query time window for stream scanner
  pTimeWindow->skey = INT64_MIN;

  streamSetStatusNormal(pStreamTask);
  streamSchedExec(pStreamTask);
  streamMetaReleaseTask(pTask->pMeta, pStreamTask);
  return TSDB_CODE_SUCCESS;
}

379 380 381 382
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
383
int32_t streamExecForAll(SStreamTask* pTask) {
384 385
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
386
  while (1) {
387
    int32_t batchSize = 1;
L
liuyao 已提交
388
    int16_t times = 0;
389

390 391
    SStreamQueueItem* pInput = NULL;

392
    // merge multiple input data if possible in the input queue.
393
    qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
H
Haojun Liao 已提交
394

L
Liu Jicong 已提交
395
    while (1) {
396
      // downstream task's input queue is blocked, stop immediately
397
      if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
H
Haojun Liao 已提交
398
          streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
399 400 401
        if (batchSize > 1) {
          break;
        } else {
402
          qDebug("123 %s", pTask->id.idStr);
L
liuyao 已提交
403 404
          return 0;
        }
L
liuyao 已提交
405
      }
406

L
Liu Jicong 已提交
407 408
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
409
        if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
L
liuyao 已提交
410
          times++;
411
          taosMsleep(10);
412
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
413 414
          continue;
        }
415

L
liuyao 已提交
416
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
417
        break;
L
Liu Jicong 已提交
418
      }
419 420 421

      if (pInput == NULL) {
        pInput = qItem;
422
        streamQueueProcessSuccess(pTask->inputQueue);
423
        if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
424
          break;
L
Liu Jicong 已提交
425
        }
L
Liu Jicong 已提交
426
      } else {
427
        // todo we need to sort the data block, instead of just appending into the array list.
428 429
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
430 431 432
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
433 434
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
435
          streamQueueProcessSuccess(pTask->inputQueue);
436

L
liuyao 已提交
437
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
438
            qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
439
                   MAX_STREAM_EXEC_BATCH_NUM);
5
54liuyao 已提交
440 441
            break;
          }
L
Liu Jicong 已提交
442
        }
L
Liu Jicong 已提交
443 444
      }
    }
445

446
    if (streamTaskShouldStop(&pTask->status)) {
447 448 449
      if (pInput) {
        streamFreeQitem(pInput);
      }
L
Liu Jicong 已提交
450
      return 0;
L
Liu Jicong 已提交
451
    }
L
Liu Jicong 已提交
452

453
    if (pInput == NULL) {
454 455
      if (pTask->info.fillHistory && pTask->status.transferState) {
        int32_t code = streamTransferStateToStreamTask(pTask);
H
Haojun Liao 已提交
456
      }
457

L
Liu Jicong 已提交
458 459 460
      break;
    }

461
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
462
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
463
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
464
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
465
      continue;
L
Liu Jicong 已提交
466
    }
L
Liu Jicong 已提交
467

468
    int64_t st = taosGetTimestampMs();
469
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
470

471 472
    {
      // set input
473
      void* pExecutor = pTask->exec.pExecutor;
474 475 476 477 478 479

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
480
        ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
481 482
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
483
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
484 485 486 487 488 489
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
490
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
491 492 493 494 495 496
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
497
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
498 499 500 501 502 503
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
504
      }
505
    }
506

507 508 509
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
510

511
    double  el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
512 513
    qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, batchSize, el, resSize / 1048576.0, totalBlocks);
514

515
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
516
  }
517

L
Liu Jicong 已提交
518
  return 0;
L
Liu Jicong 已提交
519 520
}

521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
bool streamTaskIsIdle(const SStreamTask* pTask) {
  int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
  if (numOfItems > 0) {
    return false;
  }

  numOfItems = taosQallItemSize(pTask->inputQueue->qall);
  if (numOfItems > 0) {
    return false;
  }

  // blocked by downstream task
  if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
    return false;
  }

  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
}

L
Liu Jicong 已提交
540
int32_t streamTryExec(SStreamTask* pTask) {
541
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
542
  int8_t schedStatus =
543
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
544

L
Liu Jicong 已提交
545 546 547
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
548
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
549 550
      return -1;
    }
551

552
    // todo the task should be commit here
553
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
554
    qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
555
           pTask->status.schedStatus);
L
Liu Jicong 已提交
556

dengyihao's avatar
dengyihao 已提交
557 558
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
559
      streamSchedExec(pTask);
L
Liu Jicong 已提交
560 561
    }
  }
562

L
Liu Jicong 已提交
563
  return 0;
L
Liu Jicong 已提交
564
}
L
liuyao 已提交
565 566 567 568 569 570 571 572 573 574 575 576

int32_t streamTaskReleaseState(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  int32_t code = qStreamOperatorReleaseState(exec);
  return code;
}

int32_t streamTaskReloadState(SStreamTask* pTask) {
  void* exec = pTask->exec.pExecutor;
  int32_t code = qStreamOperatorReloadState(exec);
  return code;
}