streamExec.c 16.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM 32
20
#define MIN_STREAM_EXEC_BATCH_NUM 4
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  100
5
54liuyao 已提交
22

Y
yihaoDeng 已提交
23
static int32_t updateCheckPointInfo(SStreamTask* pTask);
24

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51
      return -1;
    }

Y
yihaoDeng 已提交
52 53
    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
           size / 1048576.0);
54 55

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
Y
yihaoDeng 已提交
56
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {  // back pressure and record position
57
      destroyStreamDataBlock(pStreamBlocks);
58 59
      return -1;
    }
60 61 62

    *totalSize += size;
    *totalBlocks += numOfBlocks;
63
  } else {
64
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
65 66 67 68 69
  }

  return TSDB_CODE_SUCCESS;
}

Y
yihaoDeng 已提交
70 71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
                                  int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
Y
yihaoDeng 已提交
88
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
Y
yihaoDeng 已提交
105
        SSDataBlock             block = {0};
Y
yihaoDeng 已提交
106
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
107
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
108

L
Liu Jicong 已提交
109
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
110
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
111 112
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
113
        numOfBlocks += 1;
H
Haojun Liao 已提交
114
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
115
               pRetrieveBlock->reqId);
116
      }
H
Haojun Liao 已提交
117

118 119
      break;
    }
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
128 129 130
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
131

132 133 134
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
135
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
136

137
    qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
138 139 140
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
141 142 143
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
144 145 146 147
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
148
      pRes = NULL;
149 150
      size = 0;
      numOfBlocks = 0;
151
    }
152
  }
153

154 155 156
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
157
  } else {
Y
yihaoDeng 已提交
158
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
159
  }
160

H
Haojun Liao 已提交
161
  return code;
L
Liu Jicong 已提交
162 163
}

164
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
165
  int32_t code = 0;
166

dengyihao's avatar
dengyihao 已提交
167
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
168
  void* exec = pTask->exec.pExecutor;
169

L
Liu Jicong 已提交
170
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
171
  bool finished = false;
L
Liu Jicong 已提交
172

173 174 175 176 177 178 179 180 181
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
182
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
183
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
184 185 186
        return 0;
      }

187 188 189
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
190
        continue;
191
      }
L
Liu Jicong 已提交
192
      if (output == NULL) {
L
Liu Jicong 已提交
193 194 195 196 197
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
198 199
        break;
      }
200 201 202 203 204 205

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
206 207
      batchCnt++;

208
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
209 210 211
      if (batchCnt >= batchSz) {
        break;
      }
212
    }
H
Haojun Liao 已提交
213

214
    if (taosArrayGetSize(pRes) == 0) {
H
Haojun Liao 已提交
215 216
      taosArrayDestroy(pRes);

217
      if (finished) {
H
Haojun Liao 已提交
218
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
219 220
        break;
      } else {
H
Haojun Liao 已提交
221
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
222 223
        continue;
      }
224
    }
H
Haojun Liao 已提交
225

S
Shengliang Guan 已提交
226
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
227 228 229 230 231 232 233 234
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
235
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
236
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
237
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
238
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
239 240
      return code;
    }
241 242 243 244 245
//
//    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
//      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
//      streamDispatchStreamBlock(pTask);
//    }
246 247 248 249

    if (finished) {
      break;
    }
250 251 252 253 254
  }
  return 0;
}

#if 0
255 256 257 258
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
259
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
260 261 262 263 264 265 266 267 268 269
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
270
    }
271
  }
272

273 274 275 276 277
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
278

279
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
280
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
281
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
282 283
  }

284 285 286 287
  // exec impl

  // output
  // try dispatch
288 289
  return 0;
}
290
#endif
L
Liu Jicong 已提交
291

Y
yihaoDeng 已提交
292
int32_t updateCheckPointInfo(SStreamTask* pTask) {
293 294 295 296 297 298 299
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
Y
yihaoDeng 已提交
300 301
           ", checkPoint id:%" PRId64 " -> %" PRId64,
           pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
                                    const char* id) {
  int32_t retryTimes = 0;
  int32_t MAX_RETRY_TIMES = 5;

  while (1) {
    if (streamTaskShouldPause(&pTask->status)) {
      qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
      return TSDB_CODE_SUCCESS;
    }

    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
      if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
        taosMsleep(10);
        qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
        continue;
      }

      qDebug("===stream===break batchSize:%d", *numOfBlocks);
      return TSDB_CODE_SUCCESS;
    }

    // do not merge blocks for sink node
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
      *numOfBlocks = 1;
      *pInput = qItem;
      return TSDB_CODE_SUCCESS;
    }

351
    if (*pInput == NULL) {
352 353 354 355 356 357 358 359 360 361
      ASSERT((*numOfBlocks) == 0);
      *pInput = qItem;
    } else {
      // todo we need to sort the data block, instead of just appending into the array list.
      void* newRet = streamMergeQueueItem(*pInput, qItem);
      if (newRet == NULL) {
        qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
        streamQueueProcessFail(pTask->inputQueue);
        return TSDB_CODE_SUCCESS;
      }
362 363

      *pInput = newRet;
364 365 366 367 368 369 370 371 372 373 374 375
    }

    *numOfBlocks += 1;
    streamQueueProcessSuccess(pTask->inputQueue);

    if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
      qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
      return TSDB_CODE_SUCCESS;
    }
  }
}

376 377 378 379
/**
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
 * appropriate batch of blocks should be handled in 5 to 10 sec.
 */
L
Liu Jicong 已提交
380
int32_t streamExecForAll(SStreamTask* pTask) {
381 382
  const char* id = pTask->id.idStr;

L
Liu Jicong 已提交
383
  while (1) {
384
    int32_t batchSize = 0;
385 386
    SStreamQueueItem* pInput = NULL;

387
    // merge multiple input data if possible in the input queue.
388
    qDebug("s-task:%s start to extract data block from inputQ", id);
H
Haojun Liao 已提交
389

390
    /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
391
    if (pInput == NULL) {
392
      ASSERT(batchSize == 0);
L
Liu Jicong 已提交
393 394 395
      break;
    }

396
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
397
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
398
      qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
399
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
400
      continue;
L
Liu Jicong 已提交
401
    }
L
Liu Jicong 已提交
402

403 404 405
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
406 407 408 409
      if (status == TASK_STATUS__DROPPING) {
        break;
      }

410
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE && status != TASK_STATUS__STOP) {
411
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
412
        taosMsleep(100);
413
      } else {
414
        break;
415 416
      }
    }
417

418
    int64_t st = taosGetTimestampMs();
419
    qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
H
Haojun Liao 已提交
420

421 422
    {
      // set input
423
      void* pExecutor = pTask->exec.pExecutor;
424 425 426 427 428 429 430 431 432

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
433
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
434 435 436 437 438 439
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
440
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
441 442 443 444 445 446
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
447
        qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
448 449 450 451 452 453
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
454
      }
455
    }
456

457 458 459
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
460

461
    double  el = (taosGetTimestampMs() - st) / 1000.0;
462 463
    qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
           id, el, resSize / 1048576.0, totalBlocks);
464

465
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
466
  }
467

L
Liu Jicong 已提交
468
  return 0;
L
Liu Jicong 已提交
469 470
}

L
Liu Jicong 已提交
471
int32_t streamTryExec(SStreamTask* pTask) {
472
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
473
  int8_t schedStatus =
474
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
475

L
Liu Jicong 已提交
476 477 478
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
479
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
480 481
      return -1;
    }
482

483
    // todo the task should be commit here
484
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
485
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
486

dengyihao's avatar
dengyihao 已提交
487 488
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
489
      streamSchedExec(pTask);
L
Liu Jicong 已提交
490 491
    }
  }
492

L
Liu Jicong 已提交
493
  return 0;
L
Liu Jicong 已提交
494
}