streamExec.c 16.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24 25
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);

26
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
27
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
28 29 30
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
31
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
32
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
33 34 35
  return (status == TASK_STATUS__PAUSE);
}

36 37
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
38 39
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
40
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
41 42 43 44 45 46 47
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
    SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
    if (pStreamBlocks == NULL) {
48
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
49 50 51 52 53 54 55
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
56
      destroyStreamDataBlock(pStreamBlocks);
57 58
      return -1;
    }
59 60 61 62 63

    *totalSize += size;
    *totalBlocks += numOfBlocks;

    destroyStreamDataBlock(pStreamBlocks);
64
  } else {
65
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
66 67 68 69 70
  }

  return TSDB_CODE_SUCCESS;
}

71
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
72 73
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
74

75 76 77 78 79
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
80
  SArray* pRes = NULL;
L
Liu Jicong 已提交
81 82

  while (1) {
H
Haojun Liao 已提交
83 84 85
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
86

87
    if (streamTaskShouldStop(&pTask->status)) {
88
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
89 90 91
      return 0;
    }

L
Liu Jicong 已提交
92 93
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
94
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
95
      if (code == TSDB_CODE_QRY_IN_EXEC) {
96
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
97
      }
98 99

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
100
      continue;
L
Liu Jicong 已提交
101
    }
102

103
    if (output == NULL) {
5
54liuyao 已提交
104
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
105 106
        SSDataBlock block = {0};

107
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
108
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
109

L
Liu Jicong 已提交
110
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
111
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
112 113
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
114

H
Haojun Liao 已提交
115
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
116
               pRetrieveBlock->reqId);
117
      }
H
Haojun Liao 已提交
118

119 120
      break;
    }
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
129 130 131
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
132

133 134 135
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
136
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
137

138 139 140 141
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
142 143 144
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
145 146 147 148
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

149 150
      size = 0;
      numOfBlocks = 0;
151
    }
152
  }
153

154 155 156 157 158 159
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
160 161
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
162
  }
163

L
Liu Jicong 已提交
164 165 166
  return 0;
}

167
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
168
  int32_t code = 0;
169

dengyihao's avatar
dengyihao 已提交
170
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
171

172
  void* exec = pTask->exec.pExecutor;
173

L
Liu Jicong 已提交
174
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
175
  bool finished = false;
L
Liu Jicong 已提交
176

177 178 179 180 181 182 183 184 185
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
186
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
187
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
188 189 190
        return 0;
      }

191 192 193
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
194
        continue;
195
      }
L
Liu Jicong 已提交
196
      if (output == NULL) {
L
Liu Jicong 已提交
197 198 199 200 201
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
202 203
        break;
      }
204 205 206 207 208 209

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
210 211
      batchCnt++;

212
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
213 214 215
      if (batchCnt >= batchSz) {
        break;
      }
216
    }
H
Haojun Liao 已提交
217

218
    if (taosArrayGetSize(pRes) == 0) {
219 220
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
221
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
222 223
        break;
      } else {
H
Haojun Liao 已提交
224
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
225 226
        continue;
      }
227
    }
H
Haojun Liao 已提交
228

S
Shengliang Guan 已提交
229
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
230 231 232 233 234 235 236 237
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
238
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
239
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
240
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
241
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
242 243
      return code;
    }
L
Liu Jicong 已提交
244 245

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
246
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
247 248 249 250

      SStreamDataBlock* pBlock = NULL;
      streamDispatch(pTask, &pBlock);
      destroyStreamDataBlock(pBlock);
L
Liu Jicong 已提交
251
    }
252 253 254 255

    if (finished) {
      break;
    }
256 257 258 259 260
  }
  return 0;
}

#if 0
261 262 263 264
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
265
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
266 267 268 269 270 271 272 273 274 275
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
276
    }
277
  }
278

279 280 281 282 283
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
284

285
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
286
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
287
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
288 289
  }

290 291 292 293
  // exec impl

  // output
  // try dispatch
294 295
  return 0;
}
296
#endif
L
Liu Jicong 已提交
297

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

349
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
H
Haojun Liao 已提交
350 351 352 353
  if (pBlock == NULL) {
    return;
  }

354 355 356 357
  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
}

L
Liu Jicong 已提交
358
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
359
  int32_t code = 0;
L
Liu Jicong 已提交
360
  while (1) {
361
    int32_t batchSize = 1;
L
liuyao 已提交
362
    int16_t times = 0;
363

364 365
    SStreamQueueItem* pInput = NULL;

366
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
367 368
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
369
    while (1) {
L
liuyao 已提交
370 371 372
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
373

L
Liu Jicong 已提交
374 375
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
376 377 378
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
379
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
380 381
          continue;
        }
382

L
liuyao 已提交
383
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
384
        break;
L
Liu Jicong 已提交
385
      }
386 387 388

      if (pInput == NULL) {
        pInput = qItem;
389
        streamQueueProcessSuccess(pTask->inputQueue);
390
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
391
          break;
L
Liu Jicong 已提交
392
        }
L
Liu Jicong 已提交
393
      } else {
394
        // todo we need to sort the data block, instead of just appending into the array list.
395 396
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
397 398 399
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
400 401
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
402
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
403
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
404
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
405 406
            break;
          }
L
Liu Jicong 已提交
407
        }
L
Liu Jicong 已提交
408 409
      }
    }
410

411
    if (streamTaskShouldStop(&pTask->status)) {
412 413 414
      if (pInput) {
        streamFreeQitem(pInput);
      }
415

L
Liu Jicong 已提交
416
      return 0;
L
Liu Jicong 已提交
417
    }
L
Liu Jicong 已提交
418

419
    if (pInput == NULL) {
L
Liu Jicong 已提交
420 421 422
      break;
    }

423
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
424
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
425
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
426
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
427
      continue;
L
Liu Jicong 已提交
428
    }
L
Liu Jicong 已提交
429

430 431 432 433 434 435 436
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
437
      } else {
438
        break;
439 440
      }
    }
441

442 443
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
444

445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
478
      }
479
    }
480

481 482 483
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
484

485 486
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
487
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
488
  }
489

L
Liu Jicong 已提交
490
  return 0;
L
Liu Jicong 已提交
491 492
}

L
Liu Jicong 已提交
493
int32_t streamTryExec(SStreamTask* pTask) {
494
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
495
  int8_t schedStatus =
496
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
497

L
Liu Jicong 已提交
498 499 500
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
501
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
502 503
      return -1;
    }
504

505
    // todo the task should be commit here
506
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
507
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
508

dengyihao's avatar
dengyihao 已提交
509 510
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
511
      streamSchedExec(pTask);
L
Liu Jicong 已提交
512 513
    }
  }
514

L
Liu Jicong 已提交
515
  return 0;
L
Liu Jicong 已提交
516
}