streamExec.c 16.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45 46 47
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61 62 63 64

    *totalSize += size;
    *totalBlocks += numOfBlocks;

    ASSERT(taosArrayGetSize(pRes) == 0);
    destroyStreamDataBlock(pStreamBlocks);
65
  } else {
66
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
67 68 69 70 71
  }

  return TSDB_CODE_SUCCESS;
}

72
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
73 74
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
75

76 77 78 79 80
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
81
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
82 83

  while (1) {
84
    if (streamTaskShouldStop(&pTask->status)) {
85
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
86 87 88
      return 0;
    }

L
Liu Jicong 已提交
89 90
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
91
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
92
      if (code == TSDB_CODE_QRY_IN_EXEC) {
93
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
94
      }
95 96

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
97
      continue;
L
Liu Jicong 已提交
98
    }
99

100
    if (output == NULL) {
5
54liuyao 已提交
101
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
102 103
        SSDataBlock block = {0};

104
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
105
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
106

L
Liu Jicong 已提交
107
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
108
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
109 110
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
111

H
Haojun Liao 已提交
112
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
113
               pRetrieveBlock->reqId);
114
      }
H
Haojun Liao 已提交
115

116 117
      break;
    }
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
126 127 128
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
129

130 131 132
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
133
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
134

135 136 137 138
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
139 140 141
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
142 143 144 145
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

146 147
      size = 0;
      numOfBlocks = 0;
148
    }
149
  }
150

151 152 153 154 155 156
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
157 158
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
159
  }
160

L
Liu Jicong 已提交
161 162 163
  return 0;
}

164
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
165
  int32_t code = 0;
166

dengyihao's avatar
dengyihao 已提交
167
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
168

169
  void* exec = pTask->exec.pExecutor;
170

L
Liu Jicong 已提交
171
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
172
  bool finished = false;
L
Liu Jicong 已提交
173

174 175 176 177 178 179 180 181 182
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
183
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
184
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
185 186 187
        return 0;
      }

188 189 190
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
191
        continue;
192
      }
L
Liu Jicong 已提交
193
      if (output == NULL) {
L
Liu Jicong 已提交
194 195 196 197 198
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
199 200
        break;
      }
201 202 203 204 205 206

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
207 208
      batchCnt++;

209
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
210 211 212
      if (batchCnt >= batchSz) {
        break;
      }
213
    }
H
Haojun Liao 已提交
214

215
    if (taosArrayGetSize(pRes) == 0) {
216 217
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
218
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
219 220
        break;
      } else {
H
Haojun Liao 已提交
221
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
222 223
        continue;
      }
224
    }
H
Haojun Liao 已提交
225

S
Shengliang Guan 已提交
226
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
227 228 229 230 231 232 233 234
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
235
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
236
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
237
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
238
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
239 240
      return code;
    }
L
Liu Jicong 已提交
241 242

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
243
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
244 245 246 247

      SStreamDataBlock* pBlock = NULL;
      streamDispatch(pTask, &pBlock);
      destroyStreamDataBlock(pBlock);
L
Liu Jicong 已提交
248
    }
249 250 251 252

    if (finished) {
      break;
    }
253 254 255 256 257
  }
  return 0;
}

#if 0
258 259 260 261
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
262
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
263 264 265 266 267 268 269 270 271 272
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
273
    }
274
  }
275

276 277 278 279 280
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
281

282
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
283
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
284
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
285 286
  }

287 288 289 290
  // exec impl

  // output
  // try dispatch
291 292
  return 0;
}
293
#endif
L
Liu Jicong 已提交
294

295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

346
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
H
Haojun Liao 已提交
347 348 349 350
  if (pBlock == NULL) {
    return;
  }

351 352 353 354
  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
}

L
Liu Jicong 已提交
355
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
356
  int32_t code = 0;
L
Liu Jicong 已提交
357
  while (1) {
358
    int32_t batchSize = 1;
L
liuyao 已提交
359
    int16_t times = 0;
360

361 362
    SStreamQueueItem* pInput = NULL;

363
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
364 365
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
366
    while (1) {
L
liuyao 已提交
367 368 369
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
370

L
Liu Jicong 已提交
371 372
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
373 374 375
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
376
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
377 378
          continue;
        }
379

L
liuyao 已提交
380
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
381
        break;
L
Liu Jicong 已提交
382
      }
383 384 385

      if (pInput == NULL) {
        pInput = qItem;
386
        streamQueueProcessSuccess(pTask->inputQueue);
387
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
388
          break;
L
Liu Jicong 已提交
389
        }
L
Liu Jicong 已提交
390
      } else {
391
        // todo we need to sort the data block, instead of just appending into the array list.
392 393
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
394 395 396
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
397 398
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
399
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
400
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
401
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
402 403
            break;
          }
L
Liu Jicong 已提交
404
        }
L
Liu Jicong 已提交
405 406
      }
    }
407

408
    if (streamTaskShouldStop(&pTask->status)) {
409 410 411
      if (pInput) {
        streamFreeQitem(pInput);
      }
412

L
Liu Jicong 已提交
413
      return 0;
L
Liu Jicong 已提交
414
    }
L
Liu Jicong 已提交
415

416
    if (pInput == NULL) {
L
Liu Jicong 已提交
417 418 419
      break;
    }

420
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
421
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
422
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
423
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
424
      continue;
L
Liu Jicong 已提交
425
    }
L
Liu Jicong 已提交
426

427 428 429 430 431 432 433
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
434
      } else {
435
        break;
436 437
      }
    }
438

439 440
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
441

442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
475
      }
476
    }
477

478 479 480
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
481

482 483
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
484
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
485
  }
486

L
Liu Jicong 已提交
487
  return 0;
L
Liu Jicong 已提交
488 489
}

L
Liu Jicong 已提交
490
int32_t streamTryExec(SStreamTask* pTask) {
491
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
492
  int8_t schedStatus =
493
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
494

L
Liu Jicong 已提交
495 496 497
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
498
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
499 500
      return -1;
    }
501

502
    // todo the task should be commit here
503
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
504
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
505

dengyihao's avatar
dengyihao 已提交
506 507
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
508
      streamSchedExec(pTask);
L
Liu Jicong 已提交
509 510
    }
  }
511

L
Liu Jicong 已提交
512
  return 0;
L
Liu Jicong 已提交
513
}