stream.c 12.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

dengyihao's avatar
dengyihao 已提交
19
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY         20480
H
Haojun Liao 已提交
20 21
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50)
#define ONE_MB_F                                  (1048576.0)
dengyihao's avatar
dengyihao 已提交
22
#define QUEUE_MEM_SIZE_IN_MB(_q)                  (taosQueueMemorySize(_q) / ONE_MB_F)
23

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
int32_t streamInit() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
    if (old != 2) break;
  }

  if (old == 0) {
    streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
    if (streamEnv.timer == NULL) {
      atomic_store_8(&streamEnv.inited, 0);
      return -1;
    }
    atomic_store_8(&streamEnv.inited, 1);
  }
  return 0;
}

void streamCleanUp() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(streamEnv.timer);
    atomic_store_8(&streamEnv.inited, 0);
  }
}

L
Liu Jicong 已提交
55
void streamSchedByTimer(void* param, void* tmrId) {
56 57
  SStreamTask* pTask = (void*)param;

L
liuyao 已提交
58
  if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
Liu Jicong 已提交
59
    streamMetaReleaseTask(NULL, pTask);
L
Liu Jicong 已提交
60 61 62
    return;
  }

63
  if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
S
Shengliang Guan 已提交
64
    SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
65
    if (trigger == NULL) return;
L
Liu Jicong 已提交
66
    trigger->type = STREAM_INPUT__GET_RES;
67 68 69 70 71 72
    trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
    if (trigger->pBlock == NULL) {
      taosFreeQitem(trigger);
      return;
    }

73
    trigger->pBlock->info.type = STREAM_GET_ALL;
74
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
75

76
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
L
Liu Jicong 已提交
77 78 79 80
      taosFreeQitem(trigger);
      taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
      return;
    }
81

L
Liu Jicong 已提交
82
    streamSchedExec(pTask);
83 84
  }

L
Liu Jicong 已提交
85
  taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
86 87 88 89
}

int32_t streamSetupTrigger(SStreamTask* pTask) {
  if (pTask->triggerParam != 0) {
L
Liu Jicong 已提交
90 91
    int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
    ASSERT(ref == 2);
L
Liu Jicong 已提交
92
    pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
93
    pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
94 95 96 97
  }
  return 0;
}

L
Liu Jicong 已提交
98
int32_t streamSchedExec(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
99 100
  int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                     TASK_SCHED_STATUS__WAITING);
101

L
Liu Jicong 已提交
102 103 104
  if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) {
105
      terrno = TSDB_CODE_OUT_OF_MEMORY;
106
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
Liu Jicong 已提交
107 108
      return -1;
    }
109

L
Liu Jicong 已提交
110
    pRunReq->head.vgId = pTask->nodeId;
111 112
    pRunReq->streamId = pTask->id.streamId;
    pRunReq->taskId = pTask->id.taskId;
113

dengyihao's avatar
dengyihao 已提交
114
    SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
L
Liu Jicong 已提交
115
    tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
116
    qDebug("trigger to run s-task:%s", pTask->id.idStr);
L
Liu Jicong 已提交
117
  }
118

L
Liu Jicong 已提交
119 120 121
  return 0;
}

122
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
S
Shengliang Guan 已提交
123
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
124

125 126 127 128 129 130
  int8_t status = 0;
  if (pData == NULL) {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
    qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr);
  } else {
131
    pData->type = STREAM_INPUT__DATA_BLOCK;
L
Liu Jicong 已提交
132
    pData->srcVgId = pReq->dataSrcVgId;
133 134

    streamConvertDispatchMsgToData(pReq, pData);
135
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
136
      status = TASK_INPUT_STATUS__NORMAL;
137 138
    } else {  // input queue is full, upstream is blocked now
      status = TASK_INPUT_STATUS__BLOCKED;
L
Liu Jicong 已提交
139 140 141 142
    }
  }

  // rsp by input status
143
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
144
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
145
  SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
146
  pCont->inputStatus = status;
147 148 149 150
  pCont->streamId = htobe64(pReq->streamId);
  pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
  pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
  pCont->downstreamNodeId = htonl(pTask->nodeId);
151
  pCont->downstreamTaskId = htonl(pTask->id.taskId);
L
Liu Jicong 已提交
152
  pRsp->pCont = buf;
153

L
Liu Jicong 已提交
154 155
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
156

L
Liu Jicong 已提交
157 158 159 160
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
S
Shengliang Guan 已提交
161
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
162 163 164 165
  int8_t            status = TASK_INPUT_STATUS__NORMAL;

  // enqueue
  if (pData != NULL) {
166
    qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId,
L
Liu Jicong 已提交
167 168
           pReq->srcTaskId, pReq->reqId);

169
    pData->type = STREAM_INPUT__DATA_RETRIEVE;
L
Liu Jicong 已提交
170 171 172 173 174
    pData->srcVgId = 0;
    // decode
    /*pData->blocks = pReq->data;*/
    /*pBlock->sourceVer = pReq->sourceVer;*/
    streamRetrieveReqToData(pReq, pData);
175
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    /*streamTaskInputFail(pTask);*/
    /*status = TASK_INPUT_STATUS__FAILED;*/
  }

  // rsp by input status
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
  SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->streamId = pReq->streamId;
  pCont->rspToTaskId = pReq->srcTaskId;
  pCont->rspFromTaskId = pReq->dstTaskId;
192
  pRsp->pCont = buf;
193
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
L
Liu Jicong 已提交
194 195 196 197
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

198 199
// todo add log
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
dengyihao's avatar
dengyihao 已提交
200
  int32_t code = 0;
L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
    taosFreeQitem(pBlock);
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
    taosFreeQitem(pBlock);
  } else {
    ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
dengyihao's avatar
dengyihao 已提交
211 212
    code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
    if (code != 0) {
213 214
      taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
      taosFreeQitem(pBlock);
dengyihao's avatar
dengyihao 已提交
215 216
      return code;
    }
217

L
Liu Jicong 已提交
218 219
    streamDispatch(pTask);
  }
220

L
Liu Jicong 已提交
221 222 223
  return 0;
}

L
Liu Jicong 已提交
224
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
225
  qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
H
Haojun Liao 已提交
226
         pReq->upstreamNodeId);
L
Liu Jicong 已提交
227

228
  // todo add the input queue buffer limitation
229
  streamTaskEnqueueBlocks(pTask, pReq, pRsp);
L
Liu Jicong 已提交
230
  tDeleteStreamDispatchReq(pReq);
L
Liu Jicong 已提交
231

L
Liu Jicong 已提交
232
  if (exec) {
L
Liu Jicong 已提交
233 234 235
    if (streamTryExec(pTask) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
236
  } else {
L
Liu Jicong 已提交
237
    streamSchedExec(pTask);
238
  }
L
Liu Jicong 已提交
239 240 241 242

  return 0;
}

243
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
244
  ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
245
  qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code);
L
Liu Jicong 已提交
246

247
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
248
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
249
    qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
250 251 252
    if (leftRsp > 0) {
      return 0;
    }
L
Liu Jicong 已提交
253 254
  }

255 256
  int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
  ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
257 258
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
L
Liu Jicong 已提交
259
    ASSERT(0);
260
    return 0;
L
Liu Jicong 已提交
261
  }
H
Haojun Liao 已提交
262

L
Liu Jicong 已提交
263
  // continue dispatch
L
Liu Jicong 已提交
264
  streamDispatch(pTask);
L
Liu Jicong 已提交
265 266 267
  return 0;
}

L
Liu Jicong 已提交
268
int32_t streamProcessRunReq(SStreamTask* pTask) {
L
Liu Jicong 已提交
269 270 271
  if (streamTryExec(pTask) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
272

L
Liu Jicong 已提交
273 274 275
  /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
  /*streamDispatch(pTask);*/
  /*}*/
L
Liu Jicong 已提交
276 277 278
  return 0;
}

L
Liu Jicong 已提交
279
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
280
  qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
L
Liu Jicong 已提交
281 282
  streamTaskEnqueueRetrieve(pTask, pReq, pRsp);

283
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
L
Liu Jicong 已提交
284
  streamSchedExec(pTask);
L
Liu Jicong 已提交
285 286 287
  return 0;
}

288
bool tInputQueueIsFull(const SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
289
  bool   isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
290 291
  double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
  return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
292 293
}

294
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
295 296 297
  int8_t  type = pItem->type;
  int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
  double  size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
298 299

  if (type == STREAM_INPUT__DATA_SUBMIT) {
300
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
H
Haojun Liao 已提交
301
    qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
302
           px->submit.msgLen, px->submit.ver, total, size);
H
Haojun Liao 已提交
303

304
    if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
dengyihao's avatar
dengyihao 已提交
305
      qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
306
             pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
dengyihao's avatar
dengyihao 已提交
307 308 309
             size);
      streamDataSubmitDestroy(px);
      taosFreeQitem(pItem);
310 311
      return -1;
    }
312
    taosWriteQitem(pTask->inputQueue->queue, pItem);
313 314
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
             type == STREAM_INPUT__REF_DATA_BLOCK) {
315
    if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
H
Haojun Liao 已提交
316
      qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
317
             pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
H
Haojun Liao 已提交
318
             size);
319 320 321
      return -1;
    }

H
Haojun Liao 已提交
322
    qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
323 324 325 326 327
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (type == STREAM_INPUT__GET_RES) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
H
Haojun Liao 已提交
328
    qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
329 330 331 332 333 334 335
  }

  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
  }

  return 0;
336 337
}

338 339
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

340 341 342 343 344 345 346 347 348 349 350 351 352 353
void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
    ASSERT(queue->qItem != NULL);
    return streamQueueCurItem(queue);
  } else {
    queue->qItem = NULL;
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
    return streamQueueCurItem(queue);
  }
354 355
}

dengyihao's avatar
dengyihao 已提交
356
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }