stream.c 8.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
int32_t streamInit() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
    if (old != 2) break;
  }

  if (old == 0) {
    streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
    if (streamEnv.timer == NULL) {
      atomic_store_8(&streamEnv.inited, 0);
      return -1;
    }
    atomic_store_8(&streamEnv.inited, 1);
  }
  return 0;
}

void streamCleanUp() {
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(streamEnv.timer);
    atomic_store_8(&streamEnv.inited, 0);
  }
}

L
Liu Jicong 已提交
50
void streamSchedByTimer(void* param, void* tmrId) {
51 52
  SStreamTask* pTask = (void*)param;

L
Liu Jicong 已提交
53 54 55 56
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return;
  }

57 58 59
  if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
    SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
    if (trigger == NULL) return;
L
Liu Jicong 已提交
60
    trigger->type = STREAM_INPUT__GET_RES;
61 62 63 64 65 66 67
    trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
    if (trigger->pBlock == NULL) {
      taosFreeQitem(trigger);
      return;
    }
    trigger->pBlock->info.type = STREAM_GET_ALL;

68
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
69

L
Liu Jicong 已提交
70 71 72 73 74
    if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) {
      taosFreeQitem(trigger);
      taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
      return;
    }
L
Liu Jicong 已提交
75
    streamSchedExec(pTask);
76 77
  }

L
Liu Jicong 已提交
78
  taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
79 80 81 82
}

int32_t streamSetupTrigger(SStreamTask* pTask) {
  if (pTask->triggerParam != 0) {
L
Liu Jicong 已提交
83
    pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
84
    pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
85 86 87 88
  }
  return 0;
}

L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
int32_t streamSchedExec(SStreamTask* pTask) {
  int8_t schedStatus =
      atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
  if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) {
      atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
      return -1;
    }
    pRunReq->head.vgId = pTask->nodeId;
    pRunReq->streamId = pTask->streamId;
    pRunReq->taskId = pTask->taskId;
    SRpcMsg msg = {
        .msgType = TDMT_STREAM_TASK_RUN,
        .pCont = pRunReq,
        .contLen = sizeof(SStreamTaskRunReq),
    };
    tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
  }
  return 0;
}

L
Liu Jicong 已提交
111
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
112
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
113 114 115
  int8_t            status;

  // enqueue
L
Liu Jicong 已提交
116
  if (pData != NULL) {
117
    pData->type = STREAM_INPUT__DATA_BLOCK;
L
Liu Jicong 已提交
118
    pData->srcVgId = pReq->dataSrcVgId;
L
Liu Jicong 已提交
119 120
    // decode
    /*pData->blocks = pReq->data;*/
L
Liu Jicong 已提交
121
    /*pBlock->sourceVer = pReq->sourceVer;*/
L
Liu Jicong 已提交
122 123
    streamDispatchReqToData(pReq, pData);
    if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
124 125 126 127 128 129 130 131 132 133
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
  }

  // rsp by input status
134
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
135
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
136
  SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
137
  pCont->inputStatus = status;
138 139 140 141 142
  pCont->streamId = htobe64(pReq->streamId);
  pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
  pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
  pCont->downstreamNodeId = htonl(pTask->nodeId);
  pCont->downstreamTaskId = htonl(pTask->taskId);
L
Liu Jicong 已提交
143 144 145 146 147 148 149 150 151 152 153 154
  pRsp->pCont = buf;
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
  int8_t            status = TASK_INPUT_STATUS__NORMAL;

  // enqueue
  if (pData != NULL) {
S
Shengliang Guan 已提交
155
    qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
L
Liu Jicong 已提交
156 157
           pReq->srcTaskId, pReq->reqId);

158
    pData->type = STREAM_INPUT__DATA_RETRIEVE;
L
Liu Jicong 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
    pData->srcVgId = 0;
    // decode
    /*pData->blocks = pReq->data;*/
    /*pBlock->sourceVer = pReq->sourceVer;*/
    streamRetrieveReqToData(pReq, pData);
    if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    /*streamTaskInputFail(pTask);*/
    /*status = TASK_INPUT_STATUS__FAILED;*/
  }

  // rsp by input status
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
  ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
  SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
  pCont->streamId = pReq->streamId;
  pCont->rspToTaskId = pReq->srcTaskId;
  pCont->rspFromTaskId = pReq->dstTaskId;
181
  pRsp->pCont = buf;
182
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
L
Liu Jicong 已提交
183 184 185 186
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

L
Liu Jicong 已提交
187
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
L
Liu Jicong 已提交
188 189
  qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
         pReq->upstreamTaskId);
L
Liu Jicong 已提交
190

L
Liu Jicong 已提交
191
  streamTaskEnqueue(pTask, pReq, pRsp);
L
Liu Jicong 已提交
192
  tDeleteStreamDispatchReq(pReq);
L
Liu Jicong 已提交
193

L
Liu Jicong 已提交
194
  if (exec) {
L
Liu Jicong 已提交
195 196 197
    if (streamTryExec(pTask) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
198

199
    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
200
      streamDispatch(pTask);
201
    }
L
Liu Jicong 已提交
202
  } else {
L
Liu Jicong 已提交
203
    streamSchedExec(pTask);
204
  }
L
Liu Jicong 已提交
205 206 207 208

  return 0;
}

209
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
210
  ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
L
Liu Jicong 已提交
211

212
  qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code);
L
Liu Jicong 已提交
213

214
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
215 216 217 218 219
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
    qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
    if (leftRsp > 0) return 0;
  }

220 221
  int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
  ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
222 223
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
L
Liu Jicong 已提交
224
    ASSERT(0);
225
    return 0;
L
Liu Jicong 已提交
226 227
  }
  // continue dispatch
L
Liu Jicong 已提交
228
  streamDispatch(pTask);
L
Liu Jicong 已提交
229 230 231
  return 0;
}

L
Liu Jicong 已提交
232
int32_t streamProcessRunReq(SStreamTask* pTask) {
L
Liu Jicong 已提交
233 234 235
  if (streamTryExec(pTask) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
236

237
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
238
    streamDispatch(pTask);
239
  }
L
Liu Jicong 已提交
240 241 242
  return 0;
}

L
Liu Jicong 已提交
243
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
244
  qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
L
Liu Jicong 已提交
245 246 247

  streamTaskEnqueueRetrieve(pTask, pReq, pRsp);

248
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
L
Liu Jicong 已提交
249
  streamSchedExec(pTask);
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251
  /*streamTryExec(pTask);*/
L
Liu Jicong 已提交
252

L
Liu Jicong 已提交
253
  /*streamDispatch(pTask);*/
L
Liu Jicong 已提交
254 255 256 257 258 259 260 261

  return 0;
}

int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
  //
  return 0;
}