streamExec.c 16.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45 46 47
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61 62 63

    *totalSize += size;
    *totalBlocks += numOfBlocks;

    destroyStreamDataBlock(pStreamBlocks);
64
  } else {
65
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
66 67 68 69 70
  }

  return TSDB_CODE_SUCCESS;
}

71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
88
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
105 106
        SSDataBlock block = {0};

107
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
108
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
109

L
Liu Jicong 已提交
110
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
111
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
112 113
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
114

H
Haojun Liao 已提交
115
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
116
               pRetrieveBlock->reqId);
117
      }
H
Haojun Liao 已提交
118

119 120
      break;
    }
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
129 130 131
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
132

133 134 135
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
136
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
137

138 139 140 141
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
142 143 144
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
145 146 147 148
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
149
      pRes = NULL;
150 151
      size = 0;
      numOfBlocks = 0;
152
    }
153
  }
154

155 156 157 158 159 160
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
161 162
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
163
  }
164

L
Liu Jicong 已提交
165 166 167
  return 0;
}

168
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
169
  int32_t code = 0;
170

dengyihao's avatar
dengyihao 已提交
171
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
172

173
  void* exec = pTask->exec.pExecutor;
174

L
Liu Jicong 已提交
175
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
176
  bool finished = false;
L
Liu Jicong 已提交
177

178 179 180 181 182 183 184 185 186
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
187
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
188
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
189 190 191
        return 0;
      }

192 193 194
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
195
        continue;
196
      }
L
Liu Jicong 已提交
197
      if (output == NULL) {
L
Liu Jicong 已提交
198 199 200 201 202
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
203 204
        break;
      }
205 206 207 208 209 210

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
211 212
      batchCnt++;

213
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
214 215 216
      if (batchCnt >= batchSz) {
        break;
      }
217
    }
H
Haojun Liao 已提交
218

219
    if (taosArrayGetSize(pRes) == 0) {
220 221
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
222
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
223 224
        break;
      } else {
H
Haojun Liao 已提交
225
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
226 227
        continue;
      }
228
    }
H
Haojun Liao 已提交
229

S
Shengliang Guan 已提交
230
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
231 232 233 234 235 236 237 238
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
239
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
240
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
241
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
242
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
243 244
      return code;
    }
L
Liu Jicong 已提交
245 246

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
247
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
248 249 250 251

      SStreamDataBlock* pBlock = NULL;
      streamDispatch(pTask, &pBlock);
      destroyStreamDataBlock(pBlock);
L
Liu Jicong 已提交
252
    }
253 254 255 256

    if (finished) {
      break;
    }
257 258 259 260 261
  }
  return 0;
}

#if 0
262 263 264 265
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
266
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
267 268 269 270 271 272 273 274 275 276
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
277
    }
278
  }
279

280 281 282 283 284
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
285

286
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
287
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
288
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
289 290
  }

291 292 293 294
  // exec impl

  // output
  // try dispatch
295 296
  return 0;
}
297
#endif
L
Liu Jicong 已提交
298

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

350
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
H
Haojun Liao 已提交
351 352 353 354
  if (pBlock == NULL) {
    return;
  }

355 356 357 358
  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
}

L
Liu Jicong 已提交
359
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
360
  int32_t code = 0;
L
Liu Jicong 已提交
361
  while (1) {
362
    int32_t batchSize = 1;
L
liuyao 已提交
363
    int16_t times = 0;
364

365 366
    SStreamQueueItem* pInput = NULL;

367
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
368 369
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
370
    while (1) {
L
liuyao 已提交
371 372 373
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
374

L
Liu Jicong 已提交
375 376
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
377 378 379
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
380
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
381 382
          continue;
        }
383

L
liuyao 已提交
384
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
385
        break;
L
Liu Jicong 已提交
386
      }
387 388 389

      if (pInput == NULL) {
        pInput = qItem;
390
        streamQueueProcessSuccess(pTask->inputQueue);
391
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
392
          break;
L
Liu Jicong 已提交
393
        }
L
Liu Jicong 已提交
394
      } else {
395
        // todo we need to sort the data block, instead of just appending into the array list.
396 397
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
398 399 400
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
401 402
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
403
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
404
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
405
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
406 407
            break;
          }
L
Liu Jicong 已提交
408
        }
L
Liu Jicong 已提交
409 410
      }
    }
411

412
    if (streamTaskShouldStop(&pTask->status)) {
413 414 415
      if (pInput) {
        streamFreeQitem(pInput);
      }
416

L
Liu Jicong 已提交
417
      return 0;
L
Liu Jicong 已提交
418
    }
L
Liu Jicong 已提交
419

420
    if (pInput == NULL) {
L
Liu Jicong 已提交
421 422 423
      break;
    }

424
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
425
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
426
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
427
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
428
      continue;
L
Liu Jicong 已提交
429
    }
L
Liu Jicong 已提交
430

431 432 433 434 435 436 437
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
438
      } else {
439
        break;
440 441
      }
    }
442

443 444
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
445

446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
479
      }
480
    }
481

482 483 484
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
485

486 487
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
488
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
489
  }
490

L
Liu Jicong 已提交
491
  return 0;
L
Liu Jicong 已提交
492 493
}

L
Liu Jicong 已提交
494
int32_t streamTryExec(SStreamTask* pTask) {
495
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
496
  int8_t schedStatus =
497
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
498

L
Liu Jicong 已提交
499 500 501
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
502
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
503 504
      return -1;
    }
505

506
    // todo the task should be commit here
507
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
508
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
509

dengyihao's avatar
dengyihao 已提交
510 511
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
512
      streamSchedExec(pTask);
L
Liu Jicong 已提交
513 514
    }
  }
515

L
Liu Jicong 已提交
516
  return 0;
L
Liu Jicong 已提交
517
}