streamExec.c 16.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45 46 47
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61

    *totalSize += size;
    *totalBlocks += numOfBlocks;
62
  } else {
63
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
64 65 66 67 68
  }

  return TSDB_CODE_SUCCESS;
}

69
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
70 71
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
72

73 74 75 76 77
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
78
  SArray* pRes = NULL;
L
Liu Jicong 已提交
79 80

  while (1) {
H
Haojun Liao 已提交
81 82 83
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
84

85
    if (streamTaskShouldStop(&pTask->status)) {
86
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
87 88 89
      return 0;
    }

L
Liu Jicong 已提交
90 91
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
92
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
93
      if (code == TSDB_CODE_QRY_IN_EXEC) {
94
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
95
      }
96 97

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
98
      continue;
L
Liu Jicong 已提交
99
    }
100

101
    if (output == NULL) {
5
54liuyao 已提交
102
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
103 104
        SSDataBlock block = {0};

105
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
106
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
107

L
Liu Jicong 已提交
108
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
109
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
110 111
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
112

H
Haojun Liao 已提交
113
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
114
               pRetrieveBlock->reqId);
115
      }
H
Haojun Liao 已提交
116

117 118
      break;
    }
L
Liu Jicong 已提交
119 120 121 122 123 124 125 126

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
127 128 129
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
130

131 132 133
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
134
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
135

136 137 138 139
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
140 141 142
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
143 144 145 146
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
147
      pRes = NULL;
148 149
      size = 0;
      numOfBlocks = 0;
150
    }
151
  }
152

153 154 155 156 157 158
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
159 160
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
161
  }
162

L
Liu Jicong 已提交
163 164 165
  return 0;
}

166
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
167
  int32_t code = 0;
168

dengyihao's avatar
dengyihao 已提交
169
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
170

171
  void* exec = pTask->exec.pExecutor;
172

L
Liu Jicong 已提交
173
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
174
  bool finished = false;
L
Liu Jicong 已提交
175

176 177 178 179 180 181 182 183 184
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
185
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
186
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
187 188 189
        return 0;
      }

190 191 192
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
193
        continue;
194
      }
L
Liu Jicong 已提交
195
      if (output == NULL) {
L
Liu Jicong 已提交
196 197 198 199 200
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
201 202
        break;
      }
203 204 205 206 207 208

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
209 210
      batchCnt++;

211
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
212 213 214
      if (batchCnt >= batchSz) {
        break;
      }
215
    }
H
Haojun Liao 已提交
216

217
    if (taosArrayGetSize(pRes) == 0) {
218 219
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
220
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
221 222
        break;
      } else {
H
Haojun Liao 已提交
223
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
224 225
        continue;
      }
226
    }
H
Haojun Liao 已提交
227

S
Shengliang Guan 已提交
228
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
229 230 231 232 233 234 235 236
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
237
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
238
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
239
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
240
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
241 242
      return code;
    }
L
Liu Jicong 已提交
243 244

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
245
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
246
      streamDispatch(pTask);
L
Liu Jicong 已提交
247
    }
248 249 250 251

    if (finished) {
      break;
    }
252 253 254 255 256
  }
  return 0;
}

#if 0
257 258 259 260
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
261
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
262 263 264 265 266 267 268 269 270 271
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
272
    }
273
  }
274

275 276 277 278 279
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
280

281
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
282
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
283
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
284 285
  }

286 287 288 289
  // exec impl

  // output
  // try dispatch
290 291
  return 0;
}
292
#endif
L
Liu Jicong 已提交
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

345
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
H
Haojun Liao 已提交
346 347 348 349
  if (pBlock == NULL) {
    return;
  }

350 351 352 353
  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
}

L
Liu Jicong 已提交
354
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
355
  int32_t code = 0;
L
Liu Jicong 已提交
356
  while (1) {
357
    int32_t batchSize = 1;
L
liuyao 已提交
358
    int16_t times = 0;
359

360 361
    SStreamQueueItem* pInput = NULL;

362
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
363 364
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
365
    while (1) {
L
liuyao 已提交
366 367 368
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
369

L
Liu Jicong 已提交
370 371
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
372 373 374
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
375
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
376 377
          continue;
        }
378

L
liuyao 已提交
379
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
380
        break;
L
Liu Jicong 已提交
381
      }
382 383 384

      if (pInput == NULL) {
        pInput = qItem;
385
        streamQueueProcessSuccess(pTask->inputQueue);
386
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
387
          break;
L
Liu Jicong 已提交
388
        }
L
Liu Jicong 已提交
389
      } else {
390
        // todo we need to sort the data block, instead of just appending into the array list.
391 392
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
393 394 395
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
396 397
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
398
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
399
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
400
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
401 402
            break;
          }
L
Liu Jicong 已提交
403
        }
L
Liu Jicong 已提交
404 405
      }
    }
406

407
    if (streamTaskShouldStop(&pTask->status)) {
408 409 410
      if (pInput) {
        streamFreeQitem(pInput);
      }
411

L
Liu Jicong 已提交
412
      return 0;
L
Liu Jicong 已提交
413
    }
L
Liu Jicong 已提交
414

415
    if (pInput == NULL) {
L
Liu Jicong 已提交
416 417 418
      break;
    }

419
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
420
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
421
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
422
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
423
      continue;
L
Liu Jicong 已提交
424
    }
L
Liu Jicong 已提交
425

426 427 428 429 430 431 432
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
433
      } else {
434
        break;
435 436
      }
    }
437

438 439
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
474
      }
475
    }
476

477 478 479
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
480

481 482
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
483
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
484
  }
485

L
Liu Jicong 已提交
486
  return 0;
L
Liu Jicong 已提交
487 488
}

L
Liu Jicong 已提交
489
int32_t streamTryExec(SStreamTask* pTask) {
490
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
491
  int8_t schedStatus =
492
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
493

L
Liu Jicong 已提交
494 495 496
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
497
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
498 499
      return -1;
    }
500

501
    // todo the task should be commit here
502
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
503
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
504

dengyihao's avatar
dengyihao 已提交
505 506
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
507
      streamSchedExec(pTask);
L
Liu Jicong 已提交
508 509
    }
  }
510

L
Liu Jicong 已提交
511
  return 0;
L
Liu Jicong 已提交
512
}