streamExec.c 19.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51
      return -1;
    }

Y
yihaoDeng 已提交
52 53
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
54 55

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
56
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
57
      destroyStreamDataBlock(pStreamBlocks);
58 59
      return -1;
    }
60 61 62

    *totalSize += size;
    *totalBlocks += numOfBlocks;
63
  } else {
64
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
65 66 67 68 69
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
70 71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
88
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
105
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
106
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
107
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
108

L
Liu Jicong 已提交
109
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
110
        block.info.type = STREAM_PULL_OVER;
111
        block.info.childId = pTask->info.selfChildId;
L
Liu Jicong 已提交
112
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
113
        numOfBlocks += 1;
114
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
L
Liu Jicong 已提交
115
               pRetrieveBlock->reqId);
116
      }
H
Haojun Liao 已提交
117

118 119
      break;
    }
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
128 129
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
130
    block.info.childId = pTask->info.selfChildId;
H
Haojun Liao 已提交
131

132 133 134
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
135
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
136

137
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
138
           pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
139 140

    // current output should be dispatched to down stream nodes
141 142 143
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
144 145 146 147
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
148
      pRes = NULL;
149 150
      size = 0;
      numOfBlocks = 0;
151
    }
152
  }
153

154 155 156
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
157
  } else {
Y
yihaoDeng 已提交
158
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
159
  }
160

H
Haojun Liao 已提交
161
  return code;
L
Liu Jicong 已提交
162 163
}

164
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
165
  int32_t code = 0;
166

167
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
168
  void* exec = pTask->exec.pExecutor;
169

L
Liu Jicong 已提交
170
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
171
  bool finished = false;
L
Liu Jicong 已提交
172

173 174 175 176 177 178 179 180 181
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
182
      if (streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
183
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
184 185 186
        return 0;
      }

187 188 189
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
190
        continue;
191
      }
192

L
Liu Jicong 已提交
193
      if (output == NULL) {
L
Liu Jicong 已提交
194 195 196 197
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
L
liuyao 已提交
198
          if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
            SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
            if (qRes == NULL) {
              taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
              terrno = TSDB_CODE_OUT_OF_MEMORY;
              return -1;
            }

            qRes->type = STREAM_INPUT__DATA_BLOCK;
            qRes->blocks = pRes;
            code = streamTaskOutputResultBlock(pTask, qRes);
            if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
              taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
              taosFreeQitem(qRes);
              return code;
            }
L
liuyao 已提交
214 215
            return 0;
          }
L
Liu Jicong 已提交
216
        }
L
Liu Jicong 已提交
217 218
        break;
      }
219 220 221

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
222
      block.info.childId = pTask->info.selfChildId;
223 224
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
225 226
      batchCnt++;

227
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
228 229 230
      if (batchCnt >= batchSz) {
        break;
      }
231
    }
H
Haojun Liao 已提交
232

233
    if (taosArrayGetSize(pRes) == 0) {
234 235
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
236
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
237 238
        break;
      } else {
H
Haojun Liao 已提交
239
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
240 241
        continue;
      }
242
    }
H
Haojun Liao 已提交
243

S
Shengliang Guan 已提交
244
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
245 246 247 248 249 250 251 252
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
253
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
254
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
255
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
256
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
257 258
      return code;
    }
259 260 261 262

    if (finished) {
      break;
    }
263 264 265 266 267
  }
  return 0;
}

#if 0
268 269 270 271
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
272
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
273 274 275 276 277 278 279 280 281 282
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
283
    }
284
  }
285

286 287 288 289 290
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
291

292
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
293
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
294
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
295 296
  }

297 298 299 300
  // exec impl

  // output
  // try dispatch
301 302
  return 0;
}
303
#endif
L
Liu Jicong 已提交
304

Y
yihaoDeng 已提交
305
int32_t updateCheckPointInfo(SStreamTask* pTask) {
306 307 308 309 310 311 312
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
313 314
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

334 335 336 337 338 339 340 341 342 343 344 345
static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
  // wait for the stream task to be idle
  int64_t st = taosGetTimestampMs();

  while (!streamTaskIsIdle(pStreamTask)) {
    qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
           pTask->info.taskLevel, pStreamTask->id.idStr);
    taosMsleep(100);
  }

  double el = (taosGetTimestampMs() - st) / 1000.0;
  if (el > 0) {
346
    qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr,
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
           pStreamTask->id.idStr, el);
  }
}

static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
  qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);

  // todo handle stream task is dropped here

  ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;

  // here we need to wait for the stream task handle all data in the input queue.
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
  } else {
    ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
  }

  // wait for the stream task to be idle
  waitForTaskTobeIdle(pTask, pStreamTask);

  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
    // update the scan data range for source task.
    qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
               ", status:%s, sched-status:%d",
           pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
           pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);

    // todo transfer state
  } else {
    // for sink tasks, they are continue to execute, no need to be halt.
    // the process should be stopped for a while, during the term of transfer task state.
    // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
    qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);

    // todo transfer state
  }

  // expand the query time window for stream scanner
  pTimeWindow->skey = INT64_MIN;

  streamSetStatusNormal(pStreamTask);
  streamSchedExec(pStreamTask);
  streamMetaReleaseTask(pTask->pMeta, pStreamTask);
  return TSDB_CODE_SUCCESS;
}

397 398 399 400
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
401
int32_t streamExecForAll(SStreamTask* pTask) {
402 403
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
404
  while (1) {
405
    int32_t batchSize = 1;
L
liuyao 已提交
406
    int16_t times = 0;
407

408 409
    SStreamQueueItem* pInput = NULL;

410
    // merge multiple input data if possible in the input queue.
411
    qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
H
Haojun Liao 已提交
412

L
Liu Jicong 已提交
413
    while (1) {
414
      // downstream task's input queue is blocked, stop immediately
415
      if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
H
Haojun Liao 已提交
416
          streamTaskShouldStop(&pTask->status)) {
L
liuyao 已提交
417 418 419
        if (batchSize > 1) {
          break;
        } else {
420
          qDebug("123 %s", pTask->id.idStr);
L
liuyao 已提交
421 422
          return 0;
        }
L
liuyao 已提交
423
      }
424

L
Liu Jicong 已提交
425 426
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
427
        if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
L
liuyao 已提交
428
          times++;
429
          taosMsleep(10);
430
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
431 432
          continue;
        }
433

L
liuyao 已提交
434
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
435
        break;
L
Liu Jicong 已提交
436
      }
437 438 439

      if (pInput == NULL) {
        pInput = qItem;
440
        streamQueueProcessSuccess(pTask->inputQueue);
441
        if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
442
          break;
L
Liu Jicong 已提交
443
        }
L
Liu Jicong 已提交
444
      } else {
445
        // todo we need to sort the data block, instead of just appending into the array list.
446 447
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
448 449 450
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
451 452
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
453
          streamQueueProcessSuccess(pTask->inputQueue);
454

L
liuyao 已提交
455
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
456
            qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
457
                   MAX_STREAM_EXEC_BATCH_NUM);
5
54liuyao 已提交
458 459
            break;
          }
L
Liu Jicong 已提交
460
        }
L
Liu Jicong 已提交
461 462
      }
    }
463

464
    if (streamTaskShouldStop(&pTask->status)) {
465 466 467
      if (pInput) {
        streamFreeQitem(pInput);
      }
L
Liu Jicong 已提交
468
      return 0;
L
Liu Jicong 已提交
469
    }
L
Liu Jicong 已提交
470

471
    if (pInput == NULL) {
472 473
      if (pTask->info.fillHistory && pTask->status.transferState) {
        int32_t code = streamTransferStateToStreamTask(pTask);
H
Haojun Liao 已提交
474
      }
475

L
Liu Jicong 已提交
476 477 478
      break;
    }

479
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
480
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
481
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
482
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
483
      continue;
L
Liu Jicong 已提交
484
    }
L
Liu Jicong 已提交
485

486
    int64_t st = taosGetTimestampMs();
487
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
488

489 490
    {
      // set input
491
      void* pExecutor = pTask->exec.pExecutor;
492 493 494 495 496 497

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
498
        ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
499 500
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
501
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
502 503 504 505 506 507
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
508
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
509 510 511 512 513 514
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
515
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
516 517 518 519 520 521
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
522
      }
523
    }
524

525 526 527
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
528

529
    double  el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
530 531
    qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, batchSize, el, resSize / 1048576.0, totalBlocks);
532

533
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
534
  }
535

L
Liu Jicong 已提交
536
  return 0;
L
Liu Jicong 已提交
537 538
}

539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
bool streamTaskIsIdle(const SStreamTask* pTask) {
  int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
  if (numOfItems > 0) {
    return false;
  }

  numOfItems = taosQallItemSize(pTask->inputQueue->qall);
  if (numOfItems > 0) {
    return false;
  }

  // blocked by downstream task
  if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
    return false;
  }

  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
}

L
Liu Jicong 已提交
558
int32_t streamTryExec(SStreamTask* pTask) {
559
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
560
  int8_t schedStatus =
561
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
562

L
Liu Jicong 已提交
563 564 565
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
566
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
567 568
      return -1;
    }
569

570
    // todo the task should be commit here
571
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
572
    qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
573
           pTask->status.schedStatus);
L
Liu Jicong 已提交
574

dengyihao's avatar
dengyihao 已提交
575 576
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
577
      streamSchedExec(pTask);
L
Liu Jicong 已提交
578 579
    }
  }
580

L
Liu Jicong 已提交
581
  return 0;
L
Liu Jicong 已提交
582
}
L
liuyao 已提交
583 584

int32_t streamTaskReleaseState(SStreamTask* pTask) {
H
Haojun Liao 已提交
585 586 587 588 589 590 591
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReleaseState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
592 593 594
}

int32_t streamTaskReloadState(SStreamTask* pTask) {
H
Haojun Liao 已提交
595 596 597 598 599 600 601
  void* pExecutor = pTask->exec.pExecutor;
  if (pExecutor != NULL) {
    int32_t code = qStreamOperatorReloadState(pExecutor);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
L
liuyao 已提交
602
}