streamExec.c 16.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25 26
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

  bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes,
                            int32_t size, int64_t* totalSize, int32_t* totalBlocks) {
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
      taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
      taosFreeQitem(pStreamBlocks);
      return -1;
    }
  } else {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
  }

  *totalSize += size;
  *totalBlocks += numOfBlocks;

  ASSERT(taosArrayGetSize(pRes) == 0);
  return TSDB_CODE_SUCCESS;
}

69
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
70 71
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
72

73 74 75 76 77 78
  *totalBlocks = 0;
  *totalSize = 0;

  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  int32_t size = 0;
  int32_t numOfBlocks = 0;
L
Liu Jicong 已提交
79 80

  while (1) {
81
    if (streamTaskShouldStop(&pTask->status)) {
L
Liu Jicong 已提交
82 83 84
      return 0;
    }

L
Liu Jicong 已提交
85 86
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
87
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
88
      if (code == TSDB_CODE_QRY_IN_EXEC) {
89
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
90
      }
91 92

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
93
      continue;
L
Liu Jicong 已提交
94
    }
95

96
    if (output == NULL) {
5
54liuyao 已提交
97
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
98 99
        SSDataBlock block = {0};

100
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
101
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
102

L
Liu Jicong 已提交
103
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
104
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
105 106
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
107

H
Haojun Liao 已提交
108
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
109
               pRetrieveBlock->reqId);
110
      }
H
Haojun Liao 已提交
111

112 113
      break;
    }
L
Liu Jicong 已提交
114 115 116 117 118 119 120 121

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
122 123 124
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
125

126 127 128
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
129
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
130

131 132 133 134
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
135 136 137
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
138 139 140 141
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

142 143 144
      size = 0;
      numOfBlocks = 0;
      ASSERT(taosArrayGetSize(pRes) == 0);
145
    }
146
  }
147

148 149 150 151 152 153
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
154 155

    ASSERT(taosArrayGetSize(pRes) == 0);
L
Liu Jicong 已提交
156
  }
157

L
Liu Jicong 已提交
158 159 160
  return 0;
}

161
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
162
  int32_t code = 0;
163

dengyihao's avatar
dengyihao 已提交
164
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
165

166
  void* exec = pTask->exec.pExecutor;
167

L
Liu Jicong 已提交
168
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
169
  bool finished = false;
L
Liu Jicong 已提交
170

171 172 173 174 175 176 177 178 179
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
180
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
181
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
182 183 184
        return 0;
      }

185 186 187
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
188
        continue;
189
      }
L
Liu Jicong 已提交
190
      if (output == NULL) {
L
Liu Jicong 已提交
191 192 193 194 195
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
196 197
        break;
      }
198 199 200 201 202 203

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
204 205
      batchCnt++;

206
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
207 208 209
      if (batchCnt >= batchSz) {
        break;
      }
210
    }
H
Haojun Liao 已提交
211

212
    if (taosArrayGetSize(pRes) == 0) {
213 214
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
215
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
216 217
        break;
      } else {
H
Haojun Liao 已提交
218
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
219 220
        continue;
      }
221
    }
H
Haojun Liao 已提交
222

S
Shengliang Guan 已提交
223
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
224 225 226 227 228 229 230 231
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
232
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
233
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
234
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
235
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
236 237
      return code;
    }
L
Liu Jicong 已提交
238 239

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
240
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
L
Liu Jicong 已提交
241 242
      streamDispatch(pTask);
    }
243 244 245 246

    if (finished) {
      break;
    }
247 248 249 250 251
  }
  return 0;
}

#if 0
252 253 254 255
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
256
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
257 258 259 260 261 262 263 264 265 266
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
267
    }
268
  }
269

270 271 272 273 274
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
275

276
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
277
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
278
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
279 280
  }

281 282 283 284
  // exec impl

  // output
  // try dispatch
285 286
  return 0;
}
287
#endif
L
Liu Jicong 已提交
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

L
Liu Jicong 已提交
340
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
341
  int32_t code = 0;
L
Liu Jicong 已提交
342
  while (1) {
343
    int32_t batchSize = 1;
L
liuyao 已提交
344
    int16_t times = 0;
345

346 347
    SStreamQueueItem* pInput = NULL;

348
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
349 350
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
351
    while (1) {
L
liuyao 已提交
352 353 354
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
355

L
Liu Jicong 已提交
356 357
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
358 359 360
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
361
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
362 363
          continue;
        }
364

L
liuyao 已提交
365
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
366
        break;
L
Liu Jicong 已提交
367
      }
368 369 370

      if (pInput == NULL) {
        pInput = qItem;
371
        streamQueueProcessSuccess(pTask->inputQueue);
372
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
373
          break;
L
Liu Jicong 已提交
374
        }
L
Liu Jicong 已提交
375
      } else {
376
        // todo we need to sort the data block, instead of just appending into the array list.
377 378
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
379 380 381
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
382 383
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
384
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
385
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
386
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
387 388
            break;
          }
L
Liu Jicong 已提交
389
        }
L
Liu Jicong 已提交
390 391
      }
    }
392

393
    if (streamTaskShouldStop(&pTask->status)) {
394 395 396
      if (pInput) {
        streamFreeQitem(pInput);
      }
397

L
Liu Jicong 已提交
398
      return 0;
L
Liu Jicong 已提交
399
    }
L
Liu Jicong 已提交
400

401
    if (pInput == NULL) {
L
Liu Jicong 已提交
402 403 404
      break;
    }

405
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
406
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
407
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
408
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
409
      continue;
L
Liu Jicong 已提交
410
    }
L
Liu Jicong 已提交
411

412 413 414 415 416 417 418
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
419
      } else {
420
        break;
421 422
      }
    }
423

424 425
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
426

427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
460
      }
461
    }
462

463 464 465
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
466

467 468
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
469
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
470
  }
471

L
Liu Jicong 已提交
472
  return 0;
L
Liu Jicong 已提交
473 474
}

L
Liu Jicong 已提交
475
int32_t streamTryExec(SStreamTask* pTask) {
476
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
477
  int8_t schedStatus =
478
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
479

L
Liu Jicong 已提交
480 481 482
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
483
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
484 485
      return -1;
    }
486

487
    // todo the task should be commit here
488
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
489
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
490

dengyihao's avatar
dengyihao 已提交
491 492
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
493
      streamSchedExec(pTask);
L
Liu Jicong 已提交
494 495
    }
  }
496

L
Liu Jicong 已提交
497
  return 0;
L
Liu Jicong 已提交
498
}