streamExec.c 15.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

23 24
static int32_t updateCheckPointInfo (SStreamTask* pTask);

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61

    *totalSize += size;
    *totalBlocks += numOfBlocks;
62
  } else {
63
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
64 65 66 67 68
  }

  return TSDB_CODE_SUCCESS;
}

69
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
70 71
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
72

73 74 75 76 77
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
78
  SArray* pRes = NULL;
L
Liu Jicong 已提交
79 80

  while (1) {
H
Haojun Liao 已提交
81 82 83
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
84

85
    if (streamTaskShouldStop(&pTask->status)) {
86
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
87 88 89
      return 0;
    }

L
Liu Jicong 已提交
90 91
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
92
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
93
      if (code == TSDB_CODE_QRY_IN_EXEC) {
94
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
95
      }
96 97

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
98
      continue;
L
Liu Jicong 已提交
99
    }
100

101
    if (output == NULL) {
5
54liuyao 已提交
102
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
103 104
        SSDataBlock block = {0};

105
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
106
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
107

L
Liu Jicong 已提交
108
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
109
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
110 111
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
112
        numOfBlocks += 1;
H
Haojun Liao 已提交
113
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
114
               pRetrieveBlock->reqId);
115
      }
H
Haojun Liao 已提交
116

117 118
      break;
    }
L
Liu Jicong 已提交
119 120 121 122 123 124 125 126

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
127 128 129
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
130

131 132 133
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
134
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
135

136 137 138 139
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
140 141 142
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
143 144 145 146
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
147
      pRes = NULL;
148 149
      size = 0;
      numOfBlocks = 0;
150
    }
151
  }
152

153 154 155
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
156 157
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
158
  }
159

H
Haojun Liao 已提交
160
  return code;
L
Liu Jicong 已提交
161 162
}

163
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
164
  int32_t code = 0;
165

dengyihao's avatar
dengyihao 已提交
166
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
167
  void* exec = pTask->exec.pExecutor;
168

L
Liu Jicong 已提交
169
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
170
  bool finished = false;
L
Liu Jicong 已提交
171

172 173 174 175 176 177 178 179 180
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
181
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
182
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
183 184 185
        return 0;
      }

186 187 188
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
189
        continue;
190
      }
L
Liu Jicong 已提交
191
      if (output == NULL) {
L
Liu Jicong 已提交
192 193 194 195 196
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
197 198
        break;
      }
199 200 201 202 203 204

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
205 206
      batchCnt++;

207
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
208 209 210
      if (batchCnt >= batchSz) {
        break;
      }
211
    }
H
Haojun Liao 已提交
212

213
    if (taosArrayGetSize(pRes) == 0) {
214 215
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
216
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
217 218
        break;
      } else {
H
Haojun Liao 已提交
219
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
220 221
        continue;
      }
222
    }
H
Haojun Liao 已提交
223

S
Shengliang Guan 已提交
224
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
225 226 227 228 229 230 231 232
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
233
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
234
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
235
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
236
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
237 238
      return code;
    }
239 240 241 242 243
//
//    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
//      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
//      streamDispatchStreamBlock(pTask);
//    }
244 245 246 247

    if (finished) {
      break;
    }
248 249 250 251 252
  }
  return 0;
}

#if 0
253 254 255 256
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
257
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
258 259 260 261 262 263 264 265 266 267
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
268
    }
269
  }
270

271 272 273 274 275
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
276

277
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
278
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
279
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
280 281
  }

282 283 284 285
  // exec impl

  // output
  // try dispatch
286 287
  return 0;
}
288
#endif
L
Liu Jicong 已提交
289

290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

318 319 320 321
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
322
int32_t streamExecForAll(SStreamTask* pTask) {
323 324
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
325
  while (1) {
326
    int32_t batchSize = 1;
L
liuyao 已提交
327
    int16_t times = 0;
328

329 330
    SStreamQueueItem* pInput = NULL;

331
    // merge multiple input data if possible in the input queue.
332
    qDebug("s-task:%s start to extract data block from inputQ", id);
H
Haojun Liao 已提交
333

L
Liu Jicong 已提交
334
    while (1) {
L
liuyao 已提交
335
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
336 337 338 339 340
        if (batchSize > 1) {
          break;
        } else {
          return 0;
        }
L
liuyao 已提交
341
      }
342

L
Liu Jicong 已提交
343 344
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
345 346
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
347
          taosMsleep(10);
348
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
349 350
          continue;
        }
351

L
liuyao 已提交
352
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
353
        break;
L
Liu Jicong 已提交
354
      }
355 356 357

      if (pInput == NULL) {
        pInput = qItem;
358
        streamQueueProcessSuccess(pTask->inputQueue);
359
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
360
          break;
L
Liu Jicong 已提交
361
        }
L
Liu Jicong 已提交
362
      } else {
363
        // todo we need to sort the data block, instead of just appending into the array list.
364 365
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
366 367 368
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
369 370
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
371
          streamQueueProcessSuccess(pTask->inputQueue);
372

L
liuyao 已提交
373
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
374 375
            qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id,
                   MAX_STREAM_EXEC_BATCH_NUM);
5
54liuyao 已提交
376 377
            break;
          }
L
Liu Jicong 已提交
378
        }
L
Liu Jicong 已提交
379 380
      }
    }
381

382
    if (streamTaskShouldStop(&pTask->status)) {
383 384 385
      if (pInput) {
        streamFreeQitem(pInput);
      }
L
Liu Jicong 已提交
386
      return 0;
L
Liu Jicong 已提交
387
    }
L
Liu Jicong 已提交
388

389
    if (pInput == NULL) {
L
Liu Jicong 已提交
390 391 392
      break;
    }

393
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
394
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
395
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
396
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
397
      continue;
L
Liu Jicong 已提交
398
    }
L
Liu Jicong 已提交
399

400 401 402 403
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
404
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
405
               atomic_load_8(&pTask->status.taskStatus));
406
        taosMsleep(100);
407
      } else {
408
        break;
409 410
      }
    }
411

412
    int64_t st = taosGetTimestampMs();
413
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
414

415 416
    {
      // set input
417
      void* pExecutor = pTask->exec.pExecutor;
418 419 420 421 422 423 424 425 426

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
427
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
428 429 430 431 432 433
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
434
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
435 436 437 438 439 440
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
441
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
442 443 444 445 446 447
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
448
      }
449
    }
450

451 452 453
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
454

455
    double  el = (taosGetTimestampMs() - st) / 1000.0;
456 457
    qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, el, resSize / 1048576.0, totalBlocks);
458
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
459
  }
460

L
Liu Jicong 已提交
461
  return 0;
L
Liu Jicong 已提交
462 463
}

L
Liu Jicong 已提交
464
int32_t streamTryExec(SStreamTask* pTask) {
465
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
466
  int8_t schedStatus =
467
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
468

L
Liu Jicong 已提交
469 470 471
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
472
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
473 474
      return -1;
    }
475

476
    // todo the task should be commit here
477
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
478
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
479

dengyihao's avatar
dengyihao 已提交
480 481
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
482
      streamSchedExec(pTask);
L
Liu Jicong 已提交
483 484
    }
  }
485

L
Liu Jicong 已提交
486
  return 0;
L
Liu Jicong 已提交
487
}