streamExec.c 16.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45 46 47
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61 62 63

    *totalSize += size;
    *totalBlocks += numOfBlocks;

    destroyStreamDataBlock(pStreamBlocks);
64
  } else {
65
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
66 67 68 69 70
  }

  return TSDB_CODE_SUCCESS;
}

71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
80
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
81 82

  while (1) {
83
    if (streamTaskShouldStop(&pTask->status)) {
84
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
85 86 87
      return 0;
    }

L
Liu Jicong 已提交
88 89
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
90
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
91
      if (code == TSDB_CODE_QRY_IN_EXEC) {
92
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
93
      }
94 95

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
96
      continue;
L
Liu Jicong 已提交
97
    }
98

99
    if (output == NULL) {
5
54liuyao 已提交
100
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
101 102
        SSDataBlock block = {0};

103
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
104
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
105

L
Liu Jicong 已提交
106
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
107
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
108 109
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
110

H
Haojun Liao 已提交
111
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
112
               pRetrieveBlock->reqId);
113
      }
H
Haojun Liao 已提交
114

115 116
      break;
    }
L
Liu Jicong 已提交
117 118 119 120 121 122 123 124

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
125 126 127
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
128

129 130 131
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
132
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
133

134 135 136 137
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
138 139 140
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
141 142 143 144
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

145 146
      size = 0;
      numOfBlocks = 0;
147
    }
148
  }
149

150 151 152 153 154 155
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
156 157
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
158
  }
159

L
Liu Jicong 已提交
160 161 162
  return 0;
}

163
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
164
  int32_t code = 0;
165

dengyihao's avatar
dengyihao 已提交
166
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
167

168
  void* exec = pTask->exec.pExecutor;
169

L
Liu Jicong 已提交
170
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
171
  bool finished = false;
L
Liu Jicong 已提交
172

173 174 175 176 177 178 179 180 181
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
182
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
183
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
184 185 186
        return 0;
      }

187 188 189
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
190
        continue;
191
      }
L
Liu Jicong 已提交
192
      if (output == NULL) {
L
Liu Jicong 已提交
193 194 195 196 197
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
198 199
        break;
      }
200 201 202 203 204 205

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
206 207
      batchCnt++;

208
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
209 210 211
      if (batchCnt >= batchSz) {
        break;
      }
212
    }
H
Haojun Liao 已提交
213

214
    if (taosArrayGetSize(pRes) == 0) {
215 216
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
217
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
218 219
        break;
      } else {
H
Haojun Liao 已提交
220
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
221 222
        continue;
      }
223
    }
H
Haojun Liao 已提交
224

S
Shengliang Guan 已提交
225
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
226 227 228 229 230 231 232 233
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
234
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
235
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
236
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
237
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
238 239
      return code;
    }
L
Liu Jicong 已提交
240 241

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
242
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
243 244 245 246

      SStreamDataBlock* pBlock = NULL;
      streamDispatch(pTask, &pBlock);
      destroyStreamDataBlock(pBlock);
L
Liu Jicong 已提交
247
    }
248 249 250 251

    if (finished) {
      break;
    }
252 253 254 255 256
  }
  return 0;
}

#if 0
257 258 259 260
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
261
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
262 263 264 265 266 267 268 269 270 271
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
272
    }
273
  }
274

275 276 277 278 279
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
280

281
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
282
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
283
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
284 285
  }

286 287 288 289
  // exec impl

  // output
  // try dispatch
290 291
  return 0;
}
292
#endif
L
Liu Jicong 已提交
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

345
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
H
Haojun Liao 已提交
346 347 348 349
  if (pBlock == NULL) {
    return;
  }

350 351 352 353
  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
}

L
Liu Jicong 已提交
354
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
355
  int32_t code = 0;
L
Liu Jicong 已提交
356
  while (1) {
357
    int32_t batchSize = 1;
L
liuyao 已提交
358
    int16_t times = 0;
359

360 361
    SStreamQueueItem* pInput = NULL;

362
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
363 364
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
365
    while (1) {
L
liuyao 已提交
366 367 368
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
369

L
Liu Jicong 已提交
370 371
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
372 373 374
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
375
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
376 377
          continue;
        }
378

L
liuyao 已提交
379
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
380
        break;
L
Liu Jicong 已提交
381
      }
382 383 384

      if (pInput == NULL) {
        pInput = qItem;
385
        streamQueueProcessSuccess(pTask->inputQueue);
386
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
387
          break;
L
Liu Jicong 已提交
388
        }
L
Liu Jicong 已提交
389
      } else {
390
        // todo we need to sort the data block, instead of just appending into the array list.
391 392
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
393 394 395
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
396 397
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
398
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
399
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
400
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
401 402
            break;
          }
L
Liu Jicong 已提交
403
        }
L
Liu Jicong 已提交
404 405
      }
    }
406

407
    if (streamTaskShouldStop(&pTask->status)) {
408 409 410
      if (pInput) {
        streamFreeQitem(pInput);
      }
411

L
Liu Jicong 已提交
412
      return 0;
L
Liu Jicong 已提交
413
    }
L
Liu Jicong 已提交
414

415
    if (pInput == NULL) {
L
Liu Jicong 已提交
416 417 418
      break;
    }

419
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
420
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
421
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
422
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
423
      continue;
L
Liu Jicong 已提交
424
    }
L
Liu Jicong 已提交
425

426 427 428 429 430 431 432
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
433
      } else {
434
        break;
435 436
      }
    }
437

438 439
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
474
      }
475
    }
476

477 478 479
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
480

481 482
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
483
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
484
  }
485

L
Liu Jicong 已提交
486
  return 0;
L
Liu Jicong 已提交
487 488
}

L
Liu Jicong 已提交
489
int32_t streamTryExec(SStreamTask* pTask) {
490
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
491
  int8_t schedStatus =
492
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
493

L
Liu Jicong 已提交
494 495 496
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
497
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
498 499
      return -1;
    }
500

501
    // todo the task should be commit here
502
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
503
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
504

dengyihao's avatar
dengyihao 已提交
505 506
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
507
      streamSchedExec(pTask);
L
Liu Jicong 已提交
508 509
    }
  }
510

L
Liu Jicong 已提交
511
  return 0;
L
Liu Jicong 已提交
512
}