snode.c 8.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
S
Shengliang Guan 已提交
17
#include "sndInt.h"
L
Liu Jicong 已提交
18
#include "tstream.h"
L
Liu Jicong 已提交
19
#include "tuuid.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38

void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msgStr = pMsg->pCont;
  char   *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
    tDecoderClear(&decoder);
    goto FAIL;
  }
  tDecoderClear(&decoder);

  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
39
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
40 41 42 43 44 45
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
46
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return;
  }

FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
  ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);

L
Liu Jicong 已提交
67
  pTask->refCnt = 1;
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
    return -1;
  }

  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  pTask->pMsgCb = &pSnode->msgCb;

  pTask->startVer = ver;

  pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
  if (pTask->pState == NULL) {
    return -1;
  }

  SReadHandle mgHandle = {
      .vnode = NULL,
      .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
      .pStateBackend = pTask->pState,
  };
  pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
  ASSERT(pTask->exec.executor);

  return 0;
}
H
refact  
Hongze Cheng 已提交
98

S
Shengliang Guan 已提交
99
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
wafwerar's avatar
wafwerar 已提交
100
  SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
L
Liu Jicong 已提交
101
  if (pSnode == NULL) {
L
Liu Jicong 已提交
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
103 104
    return NULL;
  }
L
Liu Jicong 已提交
105 106 107 108 109
  pSnode->path = strdup(path);
  if (pSnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
S
Shengliang Guan 已提交
110
  pSnode->msgCb = pOption->msgCb;
L
Liu Jicong 已提交
111 112

  pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE);
L
Liu Jicong 已提交
113
  if (pSnode->pMeta == NULL) {
L
Liu Jicong 已提交
114 115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
L
Liu Jicong 已提交
116
  }
L
Liu Jicong 已提交
117

S
Shengliang Guan 已提交
118
  return pSnode;
L
Liu Jicong 已提交
119 120 121 122 123

FAIL:
  taosMemoryFree(pSnode->path);
  taosMemoryFree(pSnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
124 125
}

L
Liu Jicong 已提交
126
void sndClose(SSnode *pSnode) {
127
  streamMetaCommit(pSnode->pMeta);
L
Liu Jicong 已提交
128 129
  streamMetaClose(pSnode->pMeta);
  taosMemoryFree(pSnode->path);
wafwerar's avatar
wafwerar 已提交
130
  taosMemoryFree(pSnode);
L
Liu Jicong 已提交
131
}
S
Shengliang Guan 已提交
132 133 134

int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }

L
Liu Jicong 已提交
135 136
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  int32_t code;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138
  // 1.deserialize msg and build task
L
Liu Jicong 已提交
139 140 141 142 143 144
  SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
L
Liu Jicong 已提交
145 146 147 148 149
  code = tDecodeSStreamTask(&decoder, pTask);
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
L
Liu Jicong 已提交
150 151 152
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
153
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
L
Liu Jicong 已提交
154

L
Liu Jicong 已提交
155 156 157 158 159
  // 2.save task
  code = streamMetaAddTask(pSnode->pMeta, -1, pTask);
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161 162 163 164 165
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
    streamSetParamForRecover(pTask);
    streamAggRecoverPrepare(pTask);
  }
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
  return 0;
L
Liu Jicong 已提交
168
}
L
Liu Jicong 已提交
169

L
Liu Jicong 已提交
170 171
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
172
  streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
L
Liu Jicong 已提交
173
  return 0;
L
Liu Jicong 已提交
174 175
}

L
Liu Jicong 已提交
176
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
177 178
  SStreamTaskRunReq *pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
179
  SStreamTask       *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
180 181
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
182
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
183 184 185 186
    return 0;
  } else {
    return -1;
  }
L
Liu Jicong 已提交
187 188
}

L
Liu Jicong 已提交
189 190 191 192
int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
193 194
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
195
  tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
L
Liu Jicong 已提交
196
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
197 198
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
199
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
200 201 202 203 204 205
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
206
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
207 208 209
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
210
  }
L
Liu Jicong 已提交
211
  return 0;
L
Liu Jicong 已提交
212 213
}

L
Liu Jicong 已提交
214
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
215 216 217 218 219 220 221
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
222
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
223
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
224
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
225 226 227 228 229 230
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
231
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
232
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
233
    return 0;
L
Liu Jicong 已提交
234 235 236 237 238 239 240
  } else {
    return -1;
  }
}

int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
L
Liu Jicong 已提交
241
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
242
  SStreamTask        *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
243
  if (pTask) {
244
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
245
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
246 247 248
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
249 250 251 252
  }
  return 0;
}

L
Liu Jicong 已提交
253
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
254 255 256 257
  //
  return 0;
}

L
Liu Jicong 已提交
258 259 260
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
  void   *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
261 262
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_DEPLOY:
L
Liu Jicong 已提交
263
      return sndProcessTaskDeployReq(pSnode, pReq, len);
L
Liu Jicong 已提交
264
    case TDMT_STREAM_TASK_DROP:
L
Liu Jicong 已提交
265
      return sndProcessTaskDropReq(pSnode, pReq, len);
L
Liu Jicong 已提交
266
    default:
S
shm  
Shengliang Guan 已提交
267
      ASSERT(0);
L
Liu Jicong 已提交
268
  }
L
Liu Jicong 已提交
269
  return 0;
L
Liu Jicong 已提交
270 271
}

L
Liu Jicong 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

  // deserialize
  SStreamRecoverFinishReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, msg, msgLen);
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

  // find task
L
Liu Jicong 已提交
285
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
L
Liu Jicong 已提交
286 287 288 289 290
  if (pTask == NULL) {
    return -1;
  }
  // do process request
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
291
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
292 293 294
    return -1;
  }

L
Liu Jicong 已提交
295
  streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
296 297 298 299 300 301 302 303
  return 0;
}

int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
304
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
305 306 307 308
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_RUN:
      return sndProcessTaskRunReq(pSnode, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
309
      return sndProcessTaskDispatchReq(pSnode, pMsg, true);
L
Liu Jicong 已提交
310 311
    case TDMT_STREAM_TASK_DISPATCH_RSP:
      return sndProcessTaskDispatchRsp(pSnode, pMsg);
L
Liu Jicong 已提交
312 313
    case TDMT_STREAM_RETRIEVE:
      return sndProcessTaskRetrieveReq(pSnode, pMsg);
L
Liu Jicong 已提交
314
    case TDMT_STREAM_RETRIEVE_RSP:
5
54liuyao 已提交
315
      return sndProcessTaskRetrieveRsp(pSnode, pMsg);
L
Liu Jicong 已提交
316 317 318 319
    case TDMT_STREAM_RECOVER_FINISH:
      return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
L
Liu Jicong 已提交
320 321 322 323
    default:
      ASSERT(0);
  }
  return 0;
L
Liu Jicong 已提交
324
}