stream.c 18.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

19
#define STREAM_TASK_INPUT_QUEUE_CAPACITY          20480
20
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE  (30)
H
Haojun Liao 已提交
21
#define ONE_MB_F                                  (1048576.0)
dengyihao's avatar
dengyihao 已提交
22
#define QUEUE_MEM_SIZE_IN_MB(_q)                  (taosQueueMemorySize(_q) / ONE_MB_F)
23

24 25
SStreamGlobalEnv streamEnv;

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
int32_t streamInit() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
    if (old != 2) break;
  }

  if (old == 0) {
    streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
    if (streamEnv.timer == NULL) {
      atomic_store_8(&streamEnv.inited, 0);
      return -1;
    }
    atomic_store_8(&streamEnv.inited, 1);
  }
41

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
  return 0;
}

void streamCleanUp() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(streamEnv.timer);
    atomic_store_8(&streamEnv.inited, 0);
  }
}

58 59 60 61 62 63
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
  char buf[128] = {0};
  sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
  return taosStrdup(buf);
}

H
Haojun Liao 已提交
64
static void streamSchedByTimer(void* param, void* tmrId) {
65 66
  SStreamTask* pTask = (void*)param;

67
  int8_t status = atomic_load_8(&pTask->triggerStatus);
68
  qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
69

L
liuyao 已提交
70
  if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
Liu Jicong 已提交
71
    streamMetaReleaseTask(NULL, pTask);
72
    qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
L
Liu Jicong 已提交
73 74 75
    return;
  }

76
  if (status == TASK_TRIGGER_STATUS__ACTIVE) {
77 78
    SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
    if (pTrigger == NULL) {
79 80 81
      return;
    }

82 83 84 85
    pTrigger->type = STREAM_INPUT__GET_RES;
    pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
    if (pTrigger->pBlock == NULL) {
      taosFreeQitem(pTrigger);
86 87 88
      return;
    }

89
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
90 91 92
    pTrigger->pBlock->info.type = STREAM_GET_ALL;
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) {
      taosFreeQitem(pTrigger);
93
      taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
L
Liu Jicong 已提交
94 95
      return;
    }
96

L
Liu Jicong 已提交
97
    streamSchedExec(pTask);
98 99
  }

100
  taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
101 102
}

103
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
104
  if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) {
L
Liu Jicong 已提交
105
    int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
106 107
    ASSERT(ref == 2 && pTask->schedTimer == NULL);

H
Haojun Liao 已提交
108
    qDebug("s-task:%s setup scheduler trigger, delay:%"PRId64" ms", pTask->id.idStr, pTask->triggerParam);
109 110

    pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
111
    pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
112
  }
113

114 115 116
  return 0;
}

L
Liu Jicong 已提交
117
int32_t streamSchedExec(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
118 119
  int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                     TASK_SCHED_STATUS__WAITING);
120

L
Liu Jicong 已提交
121 122 123
  if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) {
124
      terrno = TSDB_CODE_OUT_OF_MEMORY;
125
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
126
      qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
127 128
      return -1;
    }
129

130
    pRunReq->head.vgId = pTask->info.nodeId;
131 132
    pRunReq->streamId = pTask->id.streamId;
    pRunReq->taskId = pTask->id.taskId;
133

134 135
    qDebug("trigger to run s-task:%s", pTask->id.idStr);

dengyihao's avatar
dengyihao 已提交
136
    SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
L
Liu Jicong 已提交
137
    tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
138 139
  } else {
    qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
L
Liu Jicong 已提交
140
  }
141

L
Liu Jicong 已提交
142 143 144
  return 0;
}

145
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
146
  int8_t status = 0;
147 148 149

  SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
  if (pBlock == NULL) {
150 151
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
152
    qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
153
           pTask->id.idStr);
154
  } else {
155 156 157
    int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
    // input queue is full, upstream is blocked now
    status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
L
Liu Jicong 已提交
158 159 160
  }

  // rsp by input status
161
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
162
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
163
  SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
164

165 166 167 168
  pDispatchRsp->inputStatus = status;
  pDispatchRsp->streamId = htobe64(pReq->streamId);
  pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
  pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
169
  pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
170 171 172
  pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);

  pRsp->pCont = buf;
L
Liu Jicong 已提交
173 174
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
175

L
Liu Jicong 已提交
176 177 178 179
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
S
Shengliang Guan 已提交
180
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
181 182 183 184
  int8_t            status = TASK_INPUT_STATUS__NORMAL;

  // enqueue
  if (pData != NULL) {
185 186
    qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
           pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
L
Liu Jicong 已提交
187

188
    pData->type = STREAM_INPUT__DATA_RETRIEVE;
L
Liu Jicong 已提交
189 190
    pData->srcVgId = 0;
    streamRetrieveReqToData(pReq, pData);
191
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
192 193 194 195
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
196
  } else {  // todo handle oom
L
Liu Jicong 已提交
197 198 199 200 201 202 203 204 205 206 207
    /*streamTaskInputFail(pTask);*/
    /*status = TASK_INPUT_STATUS__FAILED;*/
  }

  // rsp by input status
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
  SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->streamId = pReq->streamId;
  pCont->rspToTaskId = pReq->srcTaskId;
  pCont->rspFromTaskId = pReq->dstTaskId;
208
  pRsp->pCont = buf;
209
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
L
Liu Jicong 已提交
210
  tmsgSendRsp(pRsp);
211

L
Liu Jicong 已提交
212 213 214
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

215 216
// todo add log
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
dengyihao's avatar
dengyihao 已提交
217
  int32_t code = 0;
218 219
  int32_t type = pTask->outputInfo.type;
  if (type == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
220
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
221
    destroyStreamDataBlock(pBlock);
222
  } else if (type == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
223
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
224
    destroyStreamDataBlock(pBlock);
L
Liu Jicong 已提交
225
  } else {
226 227
    ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
    code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
H
Haojun Liao 已提交
228
    if (code != 0) {  // todo failed to add it into the output queue, free it.
dengyihao's avatar
dengyihao 已提交
229 230
      return code;
    }
231

232
    streamDispatchStreamBlock(pTask);
L
Liu Jicong 已提交
233
  }
234

L
Liu Jicong 已提交
235 236 237
  return 0;
}

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
  int8_t status = 0;

  SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
  if (pBlock == NULL) {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
    qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
           pTask->id.idStr);
  } else {
    int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
    // input queue is full, upstream is blocked now
    status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
  }

  return status;
}

256 257 258
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
  qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
         pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
L
Liu Jicong 已提交
259

260
  int32_t status = 0;
L
Liu Jicong 已提交
261

262 263 264 265 266 267
  SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
  ASSERT(pInfo != NULL);

  if (!pInfo->dataAllowed) {
    qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId);
    status = TASK_INPUT_STATUS__BLOCKED;
L
Liu Jicong 已提交
268
  } else {
269 270 271 272 273 274 275
    // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
    if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
      streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
      qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
    }

    status = streamTaskAppendInputBlocks(pTask, pReq);
276
  }
L
Liu Jicong 已提交
277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
  {
    // do send response with the input status
    int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle failure
      return code;
    }

    pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
    tmsgSendRsp(pRsp);
  }

  tDeleteStreamDispatchReq(pReq);
  streamSchedExec(pTask);

L
Liu Jicong 已提交
293 294 295
  return 0;
}

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
//  qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
//         pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
//
//  // todo add the input queue buffer limitation
//  streamTaskEnqueueBlocks(pTask, pReq, pRsp);
//  tDeleteStreamDispatchReq(pReq);
//
//  if (exec) {
//    if (streamTryExec(pTask) < 0) {
//      return -1;
//    }
//  } else {
//    streamSchedExec(pTask);
//  }
//
//  return 0;
//}

315
// todo record the idle time for dispatch data
316
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
317 318 319 320 321
  if (code != TSDB_CODE_SUCCESS) {
    // dispatch message failed: network error, or node not available.
    // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
    // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
    // happened too fast. todo handle the shuffle dispatch failure
322 323 324 325 326 327 328
    if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) {
      qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr,
             pRsp->downstreamTaskId, tstrerror(code));
      return code;
    } else {
      qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
             pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
329
      return doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
330 331 332
    }
  }

333
  qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
L
Liu Jicong 已提交
334

335
  // there are other dispatch message not response yet
336
  if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
337
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
338
    qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
339 340 341
    if (leftRsp > 0) {
      return 0;
    }
L
Liu Jicong 已提交
342 343
  }

344
  pTask->msgInfo.retryCount = 0;
345
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
346

347
  qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
348

349 350
  // the input queue of the (down stream) task that receive the output data is full,
  // so the TASK_INPUT_STATUS_BLOCKED is rsp
351
  // todo blocking the output status
H
Haojun Liao 已提交
352
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
353 354 355 356 357 358 359 360 361 362
    pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time

    int32_t waitDuration = 300; //  300 ms
    qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
           pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
    streamRetryDispatchStreamBlock(pTask, waitDuration);
  } else { // pipeline send data in output queue
    // this message has been sent successfully, let's try next one.
    destroyStreamDataBlock(pTask->msgInfo.pData);
    pTask->msgInfo.pData = NULL;
363

364 365 366 367 368 369 370
    if (pTask->msgInfo.blockingTs != 0) {
      int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
      qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
      pTask->msgInfo.blockingTs = 0;
    }

    // now ready for next data output
371
    atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
372

373 374
    // otherwise, continue dispatch the first block to down stream task in pipeline
    streamDispatchStreamBlock(pTask);
L
Liu Jicong 已提交
375
  }
H
Haojun Liao 已提交
376

L
Liu Jicong 已提交
377 378 379
  return 0;
}

L
Liu Jicong 已提交
380
int32_t streamProcessRunReq(SStreamTask* pTask) {
L
Liu Jicong 已提交
381 382 383
  if (streamTryExec(pTask) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
384

L
Liu Jicong 已提交
385 386 387
  return 0;
}

L
Liu Jicong 已提交
388 389
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
  streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
390
  ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
L
Liu Jicong 已提交
391
  streamSchedExec(pTask);
L
Liu Jicong 已提交
392 393 394
  return 0;
}

395
bool tInputQueueIsFull(const SStreamTask* pTask) {
396
  bool   isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
397
  double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
398
  return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
399 400
}

401
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
402 403 404
  int8_t  type = pItem->type;
  int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
  double  size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
405 406

  if (type == STREAM_INPUT__DATA_SUBMIT) {
407
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
408
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
409
      qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
410
             pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
dengyihao's avatar
dengyihao 已提交
411 412 413
             size);
      streamDataSubmitDestroy(px);
      taosFreeQitem(pItem);
414 415
      return -1;
    }
416

417 418 419
    int32_t msgLen = px->submit.msgLen;
    int64_t ver = px->submit.ver;

420 421 422 423 424 425
    int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
    if (code != TSDB_CODE_SUCCESS) {
      streamDataSubmitDestroy(px);
      taosFreeQitem(pItem);
      return code;
    }
426

427
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
428
    qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
429
           msgLen, ver, total, size + msgLen/1048576.0);
430 431
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
             type == STREAM_INPUT__REF_DATA_BLOCK) {
432
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
H
Haojun Liao 已提交
433
      qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
434
             pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
H
Haojun Liao 已提交
435
             size);
436
      destroyStreamDataBlock((SStreamDataBlock*) pItem);
437 438 439
      return -1;
    }

440
    qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
441 442 443 444 445
    int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
    if (code != TSDB_CODE_SUCCESS) {
      destroyStreamDataBlock((SStreamDataBlock*) pItem);
      return code;
    }
H
Haojun Liao 已提交
446
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
447
    taosWriteQitem(pTask->inputQueue->queue, pItem);
H
Haojun Liao 已提交
448
    qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
449
  } else if (type == STREAM_INPUT__GET_RES) {
450
    // use the default memory limit, refactor later.
451
    taosWriteQitem(pTask->inputQueue->queue, pItem);
H
Haojun Liao 已提交
452
    qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
453 454 455 456
  }

  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
457
    qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
458 459 460
  }

  return 0;
461 462
}

463 464
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

465 466 467 468 469 470
void* streamQueueNextItem(SStreamQueue* pQueue) {
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);

  if (flag == STREAM_QUEUE__FAILED) {
    ASSERT(pQueue->qItem != NULL);
    return streamQueueCurItem(pQueue);
471
  } else {
472 473 474 475 476
    pQueue->qItem = NULL;
    taosGetQitem(pQueue->qall, &pQueue->qItem);
    if (pQueue->qItem == NULL) {
      taosReadAllQitems(pQueue->queue, pQueue->qall);
      taosGetQitem(pQueue->qall, &pQueue->qItem);
477
    }
478 479

    return streamQueueCurItem(pQueue);
480
  }
481 482
}

483 484 485 486 487 488 489 490 491 492 493 494 495
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }

SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
  int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
  for(int32_t i = 0; i < num; ++i) {
    SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
    if (pInfo->taskId == taskId) {
      return pInfo;
    }
  }

  return NULL;
}