streamExec.c 15.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51
      return -1;
    }

Y
yihaoDeng 已提交
52 53
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
54 55

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
56
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
57
      destroyStreamDataBlock(pStreamBlocks);
58 59
      return -1;
    }
60 61 62

    *totalSize += size;
    *totalBlocks += numOfBlocks;
63
  } else {
64
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
65 66 67 68 69
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
70 71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
88
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
105
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
106
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
107
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
108

L
Liu Jicong 已提交
109
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
110
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
111 112
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
113
        numOfBlocks += 1;
H
Haojun Liao 已提交
114
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
115
               pRetrieveBlock->reqId);
116
      }
H
Haojun Liao 已提交
117

118 119
      break;
    }
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
128 129 130
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
131

132 133 134
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
135
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
136

137
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
138 139 140
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
141 142 143
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
144 145 146 147
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
148
      pRes = NULL;
149 150
      size = 0;
      numOfBlocks = 0;
151
    }
152
  }
153

154 155 156
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
157
  } else {
Y
yihaoDeng 已提交
158
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
159
  }
160

H
Haojun Liao 已提交
161
  return code;
L
Liu Jicong 已提交
162 163
}

164
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
165
  int32_t code = 0;
166

dengyihao's avatar
dengyihao 已提交
167
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
168
  void* exec = pTask->exec.pExecutor;
169

L
Liu Jicong 已提交
170
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
171
  bool finished = false;
L
Liu Jicong 已提交
172

173 174 175 176 177 178 179 180 181
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
182
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
183
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
184 185 186
        return 0;
      }

187 188 189
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
190
        continue;
191
      }
L
Liu Jicong 已提交
192
      if (output == NULL) {
L
Liu Jicong 已提交
193 194 195 196 197
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
198 199
        break;
      }
200 201 202 203 204 205

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
206 207
      batchCnt++;

208
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
209 210 211
      if (batchCnt >= batchSz) {
        break;
      }
212
    }
H
Haojun Liao 已提交
213

214
    if (taosArrayGetSize(pRes) == 0) {
215 216
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
217
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
218 219
        break;
      } else {
H
Haojun Liao 已提交
220
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
221 222
        continue;
      }
223
    }
H
Haojun Liao 已提交
224

S
Shengliang Guan 已提交
225
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
226 227 228 229 230 231 232 233
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
234
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
235
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
236
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
237
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
238 239
      return code;
    }
240 241 242 243 244
//
//    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
//      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
//      streamDispatchStreamBlock(pTask);
//    }
245 246 247 248

    if (finished) {
      break;
    }
249 250 251 252 253
  }
  return 0;
}

#if 0
254 255 256 257
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
258
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
259 260 261 262 263 264 265 266 267 268
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
269
    }
270
  }
271

272 273 274 275 276
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
277

278
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
279
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
280
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
281 282
  }

283 284 285 286
  // exec impl

  // output
  // try dispatch
287 288
  return 0;
}
289
#endif
L
Liu Jicong 已提交
290

Y
yihaoDeng 已提交
291
int32_t updateCheckPointInfo(SStreamTask* pTask) {
292 293 294 295 296 297 298
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
299 300
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
                                    const char* id) {
  int32_t retryTimes = 0;
  int32_t MAX_RETRY_TIMES = 5;

  while (1) {
    if (streamTaskShouldPause(&pTask->status)) {
      qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
      return TSDB_CODE_SUCCESS;
    }

    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
      if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
        taosMsleep(10);
        qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
        continue;
      }

      qDebug("===stream===break batchSize:%d", *numOfBlocks);
      return TSDB_CODE_SUCCESS;
    }

    // do not merge blocks for sink node
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
      *numOfBlocks = 1;
      *pInput = qItem;
      return TSDB_CODE_SUCCESS;
    }

350
    if (*pInput == NULL) {
351 352 353 354 355 356 357 358 359 360
      ASSERT((*numOfBlocks) == 0);
      *pInput = qItem;
    } else {
      // todo we need to sort the data block, instead of just appending into the array list.
      void* newRet = streamMergeQueueItem(*pInput, qItem);
      if (newRet == NULL) {
        qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
        streamQueueProcessFail(pTask->inputQueue);
        return TSDB_CODE_SUCCESS;
      }
361 362

      *pInput = newRet;
363 364 365 366 367 368 369 370 371 372 373 374
    }

    *numOfBlocks += 1;
    streamQueueProcessSuccess(pTask->inputQueue);

    if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
      qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
      return TSDB_CODE_SUCCESS;
    }
  }
}

375 376 377 378
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
379
int32_t streamExecForAll(SStreamTask* pTask) {
380 381
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
382
  while (1) {
383
    int32_t batchSize = 0;
384 385
    SStreamQueueItem* pInput = NULL;

386
    // merge multiple input data if possible in the input queue.
387
    qDebug("s-task:%s start to extract data block from inputQ", id);
H
Haojun Liao 已提交
388

389
    /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
390
    if (pInput == NULL) {
391
      ASSERT(batchSize == 0);
L
Liu Jicong 已提交
392 393 394
      break;
    }

395
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
396
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
397
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
398
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
399
      continue;
L
Liu Jicong 已提交
400
    }
L
Liu Jicong 已提交
401

402 403 404 405
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
406
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
407
        taosMsleep(100);
408
      } else {
409
        break;
410 411
      }
    }
412

413
    int64_t st = taosGetTimestampMs();
414
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
415

416 417
    {
      // set input
418
      void* pExecutor = pTask->exec.pExecutor;
419 420 421 422 423 424 425 426 427

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
428
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
429 430 431 432 433 434
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
435
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
436 437 438 439 440 441
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
442
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
443 444 445 446 447 448
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
449
      }
450
    }
451

452 453 454
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
455

456
    double  el = (taosGetTimestampMs() - st) / 1000.0;
457 458
    qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, el, resSize / 1048576.0, totalBlocks);
459

460
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
461
  }
462

L
Liu Jicong 已提交
463
  return 0;
L
Liu Jicong 已提交
464 465
}

L
Liu Jicong 已提交
466
int32_t streamTryExec(SStreamTask* pTask) {
467
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
468
  int8_t schedStatus =
469
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
470

L
Liu Jicong 已提交
471 472 473
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
474
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
475 476
      return -1;
    }
477

478
    // todo the task should be commit here
479
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
480
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
481

dengyihao's avatar
dengyihao 已提交
482 483
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
484
      streamSchedExec(pTask);
L
Liu Jicong 已提交
485 486
    }
  }
487

L
Liu Jicong 已提交
488
  return 0;
L
Liu Jicong 已提交
489
}