stream.c 9.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
int32_t streamInit() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
    if (old != 2) break;
  }

  if (old == 0) {
    streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
    if (streamEnv.timer == NULL) {
      atomic_store_8(&streamEnv.inited, 0);
      return -1;
    }
    atomic_store_8(&streamEnv.inited, 1);
  }
  return 0;
}

void streamCleanUp() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(streamEnv.timer);
    atomic_store_8(&streamEnv.inited, 0);
  }
}

L
Liu Jicong 已提交
50
void streamSchedByTimer(void* param, void* tmrId) {
51 52
  SStreamTask* pTask = (void*)param;

L
Liu Jicong 已提交
53 54 55 56
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return;
  }

57 58 59
  if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
    SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
    if (trigger == NULL) return;
L
Liu Jicong 已提交
60
    trigger->type = STREAM_INPUT__GET_RES;
61 62 63 64 65 66 67
    trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
    if (trigger->pBlock == NULL) {
      taosFreeQitem(trigger);
      return;
    }
    trigger->pBlock->info.type = STREAM_GET_ALL;

68
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
69 70

    streamTaskInput(pTask, (SStreamQueueItem*)trigger);
L
Liu Jicong 已提交
71
    streamSchedExec(pTask);
72 73
  }

L
Liu Jicong 已提交
74
  taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
75 76 77 78
}

int32_t streamSetupTrigger(SStreamTask* pTask) {
  if (pTask->triggerParam != 0) {
L
Liu Jicong 已提交
79
    pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
80
    pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
81 82 83 84
  }
  return 0;
}

L
Liu Jicong 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
int32_t streamSchedExec(SStreamTask* pTask) {
  int8_t schedStatus =
      atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
  if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) {
      atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
      return -1;
    }
    pRunReq->head.vgId = pTask->nodeId;
    pRunReq->streamId = pTask->streamId;
    pRunReq->taskId = pTask->taskId;
    SRpcMsg msg = {
        .msgType = TDMT_STREAM_TASK_RUN,
        .pCont = pRunReq,
        .contLen = sizeof(SStreamTaskRunReq),
    };
    tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
  }
  return 0;
}

L
Liu Jicong 已提交
107
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
108
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
109 110 111
  int8_t            status;

  // enqueue
L
Liu Jicong 已提交
112
  if (pData != NULL) {
113
    pData->type = STREAM_INPUT__DATA_BLOCK;
L
Liu Jicong 已提交
114
    pData->srcVgId = pReq->dataSrcVgId;
L
Liu Jicong 已提交
115 116
    // decode
    /*pData->blocks = pReq->data;*/
L
Liu Jicong 已提交
117
    /*pBlock->sourceVer = pReq->sourceVer;*/
L
Liu Jicong 已提交
118 119
    streamDispatchReqToData(pReq, pData);
    if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127 128 129
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
  }

  // rsp by input status
130
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
131
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
132
  SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
133 134
  pCont->inputStatus = status;
  pCont->streamId = pReq->streamId;
L
Liu Jicong 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147
  pCont->taskId = pReq->upstreamTaskId;
  pRsp->pCont = buf;
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
  int8_t            status = TASK_INPUT_STATUS__NORMAL;

  // enqueue
  if (pData != NULL) {
S
Shengliang Guan 已提交
148
    qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
L
Liu Jicong 已提交
149 150
           pReq->srcTaskId, pReq->reqId);

151
    pData->type = STREAM_INPUT__DATA_RETRIEVE;
L
Liu Jicong 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
    pData->srcVgId = 0;
    // decode
    /*pData->blocks = pReq->data;*/
    /*pBlock->sourceVer = pReq->sourceVer;*/
    streamRetrieveReqToData(pReq, pData);
    if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    /*streamTaskInputFail(pTask);*/
    /*status = TASK_INPUT_STATUS__FAILED;*/
  }

  // rsp by input status
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
  SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->streamId = pReq->streamId;
  pCont->rspToTaskId = pReq->srcTaskId;
  pCont->rspFromTaskId = pReq->dstTaskId;
174
  pRsp->pCont = buf;
175
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
L
Liu Jicong 已提交
176 177 178 179
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

L
Liu Jicong 已提交
180
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
L
Liu Jicong 已提交
181 182
  qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
         pReq->upstreamTaskId);
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184
  streamTaskEnqueue(pTask, pReq, pRsp);
L
Liu Jicong 已提交
185
  tDeleteStreamDispatchReq(pReq);
L
Liu Jicong 已提交
186

L
Liu Jicong 已提交
187
  if (exec) {
L
Liu Jicong 已提交
188 189 190
    if (streamTryExec(pTask) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
191

192
    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
193
      streamDispatch(pTask);
194
    }
L
Liu Jicong 已提交
195
  } else {
L
Liu Jicong 已提交
196
    streamSchedExec(pTask);
197
  }
L
Liu Jicong 已提交
198 199 200 201

  return 0;
}

L
Liu Jicong 已提交
202
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
203
  ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
L
Liu Jicong 已提交
204

L
Liu Jicong 已提交
205
  qDebug("task %d receive dispatch rsp", pTask->taskId);
L
Liu Jicong 已提交
206

207
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
208 209 210 211 212
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
    qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
    if (leftRsp > 0) return 0;
  }

213 214
  int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
  ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
215 216
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
L
Liu Jicong 已提交
217
    ASSERT(0);
218
    return 0;
L
Liu Jicong 已提交
219 220
  }
  // continue dispatch
L
Liu Jicong 已提交
221
  streamDispatch(pTask);
L
Liu Jicong 已提交
222 223 224
  return 0;
}

L
Liu Jicong 已提交
225
int32_t streamProcessRunReq(SStreamTask* pTask) {
L
Liu Jicong 已提交
226 227 228
  if (streamTryExec(pTask) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
229

230
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
231
    streamDispatch(pTask);
232
  }
L
Liu Jicong 已提交
233 234 235
  return 0;
}

L
Liu Jicong 已提交
236
#if 0
L
Liu Jicong 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);

  SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->inputStatus = pTask->inputStatus;
  pCont->streamId = pTask->streamId;
  pCont->reqTaskId = pTask->taskId;
  pCont->rspTaskId = pReq->upstreamTaskId;

  pRsp->pCont = buf;
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp);
  tmsgSendRsp(pRsp);
L
Liu Jicong 已提交
250 251 252
  return 0;
}

L
Liu Jicong 已提交
253 254
int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
  streamProcessRunReq(pTask);
L
Liu Jicong 已提交
255

L
Liu Jicong 已提交
256 257 258 259 260 261 262
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
    // scan data to recover
    pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
    pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
    qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
    if (streamPipelineExec(pTask, 100, true) < 0) {
      return -1;
L
Liu Jicong 已提交
263
    }
L
Liu Jicong 已提交
264 265 266
  } else {
    pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
    pTask->taskStatus = TASK_STATUS__NORMAL;
L
Liu Jicong 已提交
267 268
  }

L
Liu Jicong 已提交
269 270
  return 0;
}
L
Liu Jicong 已提交
271
#endif
L
Liu Jicong 已提交
272 273

int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
274
  qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
L
Liu Jicong 已提交
275 276 277

  streamTaskEnqueueRetrieve(pTask, pReq, pRsp);

278
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
L
Liu Jicong 已提交
279
  streamSchedExec(pTask);
L
Liu Jicong 已提交
280

L
Liu Jicong 已提交
281
  /*streamTryExec(pTask);*/
L
Liu Jicong 已提交
282

L
Liu Jicong 已提交
283
  /*streamDispatch(pTask);*/
L
Liu Jicong 已提交
284 285 286 287 288 289 290 291

  return 0;
}

int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
  //
  return 0;
}