snode.c 9.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
S
Shengliang Guan 已提交
17
#include "sndInt.h"
L
Liu Jicong 已提交
18
#include "tstream.h"
L
Liu Jicong 已提交
19
#include "tuuid.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msgStr = pMsg->pCont;
  char   *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
    tDecoderClear(&decoder);
    goto FAIL;
  }
35

L
Liu Jicong 已提交
36 37 38 39
  tDecoderClear(&decoder);

  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
40
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
41 42 43 44 45 46
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
47
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return;
  }

FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
  ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);

L
Liu Jicong 已提交
68
  pTask->refCnt = 1;
69
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
70 71 72 73 74 75 76 77 78 79
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
    return -1;
  }

  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
  pTask->pMsgCb = &pSnode->msgCb;
80
  pTask->chkInfo.version = ver;
81
  pTask->pMeta = pSnode->pMeta;
L
Liu Jicong 已提交
82 83 84 85 86 87

  pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
  if (pTask->pState == NULL) {
    return -1;
  }

88 89
  int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
  SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
90

91 92
  pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
  ASSERT(pTask->exec.pExecutor);
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94
  streamSetupTrigger(pTask);
L
Liu Jicong 已提交
95 96
  return 0;
}
H
refact  
Hongze Cheng 已提交
97

S
Shengliang Guan 已提交
98
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
wafwerar's avatar
wafwerar 已提交
99
  SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
L
Liu Jicong 已提交
100
  if (pSnode == NULL) {
L
Liu Jicong 已提交
101
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
102 103
    return NULL;
  }
104
  pSnode->path = taosStrdup(path);
L
Liu Jicong 已提交
105 106 107 108
  if (pSnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
S
Shengliang Guan 已提交
109
  pSnode->msgCb = pOption->msgCb;
L
Liu Jicong 已提交
110 111

  pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE);
L
Liu Jicong 已提交
112
  if (pSnode->pMeta == NULL) {
L
Liu Jicong 已提交
113 114
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
L
Liu Jicong 已提交
115
  }
L
Liu Jicong 已提交
116

S
Shengliang Guan 已提交
117
  return pSnode;
L
Liu Jicong 已提交
118 119 120 121 122

FAIL:
  taosMemoryFree(pSnode->path);
  taosMemoryFree(pSnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
123 124
}

L
Liu Jicong 已提交
125
void sndClose(SSnode *pSnode) {
126
  streamMetaCommit(pSnode->pMeta);
L
Liu Jicong 已提交
127 128
  streamMetaClose(pSnode->pMeta);
  taosMemoryFree(pSnode->path);
wafwerar's avatar
wafwerar 已提交
129
  taosMemoryFree(pSnode);
L
Liu Jicong 已提交
130
}
S
Shengliang Guan 已提交
131 132 133

int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }

L
Liu Jicong 已提交
134 135
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  int32_t code;
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
  // 1.deserialize msg and build task
L
Liu Jicong 已提交
138 139 140 141
  SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
142

L
Liu Jicong 已提交
143 144
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
145
  code = tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
146 147 148 149
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
L
Liu Jicong 已提交
150 151 152
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
153
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
L
Liu Jicong 已提交
154

L
Liu Jicong 已提交
155
  // 2.save task
156
  taosWLockLatch(&pSnode->pMeta->lock);
157
  code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
L
Liu Jicong 已提交
158
  if (code < 0) {
159
    taosWUnLockLatch(&pSnode->pMeta->lock);
L
Liu Jicong 已提交
160 161
    return -1;
  }
L
Liu Jicong 已提交
162

163 164
  taosWUnLockLatch(&pSnode->pMeta->lock);

L
Liu Jicong 已提交
165 166 167 168 169
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
    streamSetParamForRecover(pTask);
    streamAggRecoverPrepare(pTask);
  }
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171
  return 0;
L
Liu Jicong 已提交
172
}
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174 175
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
176
  streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
L
Liu Jicong 已提交
177
  return 0;
L
Liu Jicong 已提交
178 179
}

L
Liu Jicong 已提交
180
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
181 182
  SStreamTaskRunReq *pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
183
  SStreamTask       *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
184 185
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
186
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
187 188 189 190
    return 0;
  } else {
    return -1;
  }
L
Liu Jicong 已提交
191 192
}

L
Liu Jicong 已提交
193 194 195 196
int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
197 198
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
199
  tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
L
Liu Jicong 已提交
200
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
201 202
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
203
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
204 205 206 207 208 209
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
210
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
211 212 213
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
214
  }
L
Liu Jicong 已提交
215
  return 0;
L
Liu Jicong 已提交
216 217
}

L
Liu Jicong 已提交
218
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
219 220 221 222 223 224 225
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
226
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
227
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
228
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
229 230 231 232 233 234
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
235
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
236
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
237
    return 0;
L
Liu Jicong 已提交
238 239 240 241 242 243 244
  } else {
    return -1;
  }
}

int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
L
Liu Jicong 已提交
245
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
246
  SStreamTask        *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
247
  if (pTask) {
248
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
249
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
250 251 252
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
253 254 255 256
  }
  return 0;
}

L
Liu Jicong 已提交
257
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
258 259 260 261
  //
  return 0;
}

L
Liu Jicong 已提交
262 263 264
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
  void   *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
265 266
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_DEPLOY:
L
Liu Jicong 已提交
267
      return sndProcessTaskDeployReq(pSnode, pReq, len);
L
Liu Jicong 已提交
268
    case TDMT_STREAM_TASK_DROP:
L
Liu Jicong 已提交
269
      return sndProcessTaskDropReq(pSnode, pReq, len);
L
Liu Jicong 已提交
270
    default:
S
shm  
Shengliang Guan 已提交
271
      ASSERT(0);
L
Liu Jicong 已提交
272
  }
L
Liu Jicong 已提交
273
  return 0;
L
Liu Jicong 已提交
274 275
}

L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

  // deserialize
  SStreamRecoverFinishReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, msg, msgLen);
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

  // find task
L
Liu Jicong 已提交
289
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
L
Liu Jicong 已提交
290 291 292 293 294
  if (pTask == NULL) {
    return -1;
  }
  // do process request
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
295
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
296 297 298
    return -1;
  }

L
Liu Jicong 已提交
299
  streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
300 301 302 303 304 305 306 307
  return 0;
}

int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
308
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
309 310 311 312
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_RUN:
      return sndProcessTaskRunReq(pSnode, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
313
      return sndProcessTaskDispatchReq(pSnode, pMsg, true);
L
Liu Jicong 已提交
314 315
    case TDMT_STREAM_TASK_DISPATCH_RSP:
      return sndProcessTaskDispatchRsp(pSnode, pMsg);
L
Liu Jicong 已提交
316 317
    case TDMT_STREAM_RETRIEVE:
      return sndProcessTaskRetrieveReq(pSnode, pMsg);
L
Liu Jicong 已提交
318
    case TDMT_STREAM_RETRIEVE_RSP:
5
54liuyao 已提交
319
      return sndProcessTaskRetrieveRsp(pSnode, pMsg);
L
Liu Jicong 已提交
320 321 322 323
    case TDMT_STREAM_RECOVER_FINISH:
      return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
L
Liu Jicong 已提交
324 325 326 327
    default:
      ASSERT(0);
  }
  return 0;
L
Liu Jicong 已提交
328
}