stream.c 12.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

L
liuyao 已提交
19
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
20
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100)
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
int32_t streamInit() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
    if (old != 2) break;
  }

  if (old == 0) {
    streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
    if (streamEnv.timer == NULL) {
      atomic_store_8(&streamEnv.inited, 0);
      return -1;
    }
    atomic_store_8(&streamEnv.inited, 1);
  }
  return 0;
}

void streamCleanUp() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(streamEnv.timer);
    atomic_store_8(&streamEnv.inited, 0);
  }
}

L
Liu Jicong 已提交
53
void streamSchedByTimer(void* param, void* tmrId) {
54 55
  SStreamTask* pTask = (void*)param;

L
liuyao 已提交
56
  if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
Liu Jicong 已提交
57
    streamMetaReleaseTask(NULL, pTask);
L
Liu Jicong 已提交
58 59 60
    return;
  }

61
  if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
S
Shengliang Guan 已提交
62
    SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
63
    if (trigger == NULL) return;
L
Liu Jicong 已提交
64
    trigger->type = STREAM_INPUT__GET_RES;
65 66 67 68 69 70
    trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
    if (trigger->pBlock == NULL) {
      taosFreeQitem(trigger);
      return;
    }

71
    trigger->pBlock->info.type = STREAM_GET_ALL;
72
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
73

74
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
L
Liu Jicong 已提交
75 76 77 78
      taosFreeQitem(trigger);
      taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
      return;
    }
79

L
Liu Jicong 已提交
80
    streamSchedExec(pTask);
81 82
  }

L
Liu Jicong 已提交
83
  taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
84 85 86 87
}

int32_t streamSetupTrigger(SStreamTask* pTask) {
  if (pTask->triggerParam != 0) {
L
Liu Jicong 已提交
88 89
    int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
    ASSERT(ref == 2);
L
Liu Jicong 已提交
90
    pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
91
    pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
92 93 94 95
  }
  return 0;
}

L
Liu Jicong 已提交
96 97
int32_t streamSchedExec(SStreamTask* pTask) {
  int8_t schedStatus =
98
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
99

L
Liu Jicong 已提交
100 101 102
  if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) {
103
      terrno = TSDB_CODE_OUT_OF_MEMORY;
104
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
Liu Jicong 已提交
105 106
      return -1;
    }
107

L
Liu Jicong 已提交
108
    pRunReq->head.vgId = pTask->nodeId;
109 110
    pRunReq->streamId = pTask->id.streamId;
    pRunReq->taskId = pTask->id.taskId;
111 112

    SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) };
L
Liu Jicong 已提交
113
    tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
114
    qDebug("trigger to run s-task:%s", pTask->id.idStr);
L
Liu Jicong 已提交
115
  }
116

L
Liu Jicong 已提交
117 118 119
  return 0;
}

120
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
S
Shengliang Guan 已提交
121
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
122 123
  int8_t            status;

124
  // enqueue data block
L
Liu Jicong 已提交
125
  if (pData != NULL) {
126
    pData->type = STREAM_INPUT__DATA_BLOCK;
L
Liu Jicong 已提交
127
    pData->srcVgId = pReq->dataSrcVgId;
L
Liu Jicong 已提交
128 129
    // decode
    /*pData->blocks = pReq->data;*/
L
Liu Jicong 已提交
130
    /*pBlock->sourceVer = pReq->sourceVer;*/
L
Liu Jicong 已提交
131
    streamDispatchReqToData(pReq, pData);
132
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
133
      status = TASK_INPUT_STATUS__NORMAL;
134 135
    } else {  // input queue is full, upstream is blocked now
      status = TASK_INPUT_STATUS__BLOCKED;
L
Liu Jicong 已提交
136 137 138 139 140 141 142
    }
  } else {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
  }

  // rsp by input status
143
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
144
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
145
  SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
146
  pCont->inputStatus = status;
147 148 149 150
  pCont->streamId = htobe64(pReq->streamId);
  pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
  pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
  pCont->downstreamNodeId = htonl(pTask->nodeId);
151
  pCont->downstreamTaskId = htonl(pTask->id.taskId);
L
Liu Jicong 已提交
152
  pRsp->pCont = buf;
153

L
Liu Jicong 已提交
154 155
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
156

L
Liu Jicong 已提交
157 158 159 160
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
S
Shengliang Guan 已提交
161
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
162 163 164 165
  int8_t            status = TASK_INPUT_STATUS__NORMAL;

  // enqueue
  if (pData != NULL) {
166
    qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId,
L
Liu Jicong 已提交
167 168
           pReq->srcTaskId, pReq->reqId);

169
    pData->type = STREAM_INPUT__DATA_RETRIEVE;
L
Liu Jicong 已提交
170 171 172 173 174
    pData->srcVgId = 0;
    // decode
    /*pData->blocks = pReq->data;*/
    /*pBlock->sourceVer = pReq->sourceVer;*/
    streamRetrieveReqToData(pReq, pData);
175
    if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    /*streamTaskInputFail(pTask);*/
    /*status = TASK_INPUT_STATUS__FAILED;*/
  }

  // rsp by input status
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
  SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->streamId = pReq->streamId;
  pCont->rspToTaskId = pReq->srcTaskId;
  pCont->rspFromTaskId = pReq->dstTaskId;
192
  pRsp->pCont = buf;
193
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
L
Liu Jicong 已提交
194 195 196 197
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

L
Liu Jicong 已提交
198
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
dengyihao's avatar
dengyihao 已提交
199
  int32_t code = 0;
L
Liu Jicong 已提交
200 201 202 203 204 205 206 207 208 209
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
    taosFreeQitem(pBlock);
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
    taosFreeQitem(pBlock);
  } else {
    ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
dengyihao's avatar
dengyihao 已提交
210 211 212 213
    code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
    if (code != 0) {
      return code;
    }
L
Liu Jicong 已提交
214 215 216 217 218
    streamDispatch(pTask);
  }
  return 0;
}

L
Liu Jicong 已提交
219
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
H
Haojun Liao 已提交
220
  qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
H
Haojun Liao 已提交
221
         pReq->upstreamNodeId);
L
Liu Jicong 已提交
222

223
  // todo add the input queue buffer limitation
224
  streamTaskEnqueueBlocks(pTask, pReq, pRsp);
L
Liu Jicong 已提交
225
  tDeleteStreamDispatchReq(pReq);
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227
  if (exec) {
L
Liu Jicong 已提交
228 229 230
    if (streamTryExec(pTask) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
231
  } else {
L
Liu Jicong 已提交
232
    streamSchedExec(pTask);
233
  }
L
Liu Jicong 已提交
234 235 236 237

  return 0;
}

238
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
239
  ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
240
  qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code);
L
Liu Jicong 已提交
241

242
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
243
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
244
    qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
245 246 247
    if (leftRsp > 0) {
      return 0;
    }
L
Liu Jicong 已提交
248 249
  }

250 251
  int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
  ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
252 253
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
L
Liu Jicong 已提交
254
    ASSERT(0);
255
    return 0;
L
Liu Jicong 已提交
256
  }
H
Haojun Liao 已提交
257

L
Liu Jicong 已提交
258
  // continue dispatch
L
Liu Jicong 已提交
259
  streamDispatch(pTask);
L
Liu Jicong 已提交
260 261 262
  return 0;
}

L
Liu Jicong 已提交
263
int32_t streamProcessRunReq(SStreamTask* pTask) {
L
Liu Jicong 已提交
264 265 266
  if (streamTryExec(pTask) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
267

L
Liu Jicong 已提交
268 269 270
  /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
  /*streamDispatch(pTask);*/
  /*}*/
L
Liu Jicong 已提交
271 272 273
  return 0;
}

L
Liu Jicong 已提交
274
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
275
  qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
L
Liu Jicong 已提交
276 277
  streamTaskEnqueueRetrieve(pTask, pReq, pRsp);

278
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
L
Liu Jicong 已提交
279
  streamSchedExec(pTask);
L
Liu Jicong 已提交
280 281 282
  return 0;
}

283 284 285 286
bool tInputQueueIsFull(const SStreamTask* pTask) {
  return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
}

287
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
288 289 290
  int8_t type = pItem->type;

  if (type == STREAM_INPUT__DATA_SUBMIT) {
291 292
    SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
    if (pSubmitBlock == NULL) {
293
      qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
294 295 296 297 298
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }

299 300 301
    int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
    double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;

H
Haojun Liao 已提交
302 303
    qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
           pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size);
H
Haojun Liao 已提交
304

305 306
    if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
        (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
H
Haojun Liao 已提交
307
      qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
H
Haojun Liao 已提交
308 309
             STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
             numOfBlocks, size);
310
      streamDataSubmitDestroy(pSubmitBlock);
L
liuyao 已提交
311
      taosFreeQitem(pSubmitBlock);
312 313 314
      return -1;
    }

H
Haojun Liao 已提交
315
    taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
316 317
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
             type == STREAM_INPUT__REF_DATA_BLOCK) {
318
    int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
H
Haojun Liao 已提交
319 320 321 322 323 324 325
    double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;

    if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
        (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
      qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
             pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
             size);
326 327 328
      return -1;
    }

329
    qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks);
330 331 332 333 334 335 336 337 338 339 340 341 342 343
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (type == STREAM_INPUT__GET_RES) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  }

  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
  }

#if 0
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
344

345
  return 0;
346 347
}

348 349
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

350 351 352 353 354 355 356 357 358 359 360 361 362 363
void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
    ASSERT(queue->qItem != NULL);
    return streamQueueCurItem(queue);
  } else {
    queue->qItem = NULL;
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
    return streamQueueCurItem(queue);
  }
364 365 366 367
}

void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
368
}