snode.c 9.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
S
Shengliang Guan 已提交
17
#include "sndInt.h"
L
Liu Jicong 已提交
18
#include "tstream.h"
L
Liu Jicong 已提交
19
#include "tuuid.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msgStr = pMsg->pCont;
  char   *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
    tDecoderClear(&decoder);
    goto FAIL;
  }
35

L
Liu Jicong 已提交
36 37 38 39
  tDecoderClear(&decoder);

  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
40
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
41 42 43 44 45
  if (pTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
46
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
47
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return;
  }

FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
H
Haojun Liao 已提交
65
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0);
L
Liu Jicong 已提交
66

L
Liu Jicong 已提交
67
  pTask->refCnt = 1;
68
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
69 70 71

  pTask->inputQueue = streamQueueOpen(0);
  pTask->outputQueue = streamQueueOpen(0);
L
Liu Jicong 已提交
72 73 74 75 76 77 78 79

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
    return -1;
  }

  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
  pTask->pMsgCb = &pSnode->msgCb;
80
  pTask->chkInfo.version = ver;
81
  pTask->pMeta = pSnode->pMeta;
L
Liu Jicong 已提交
82 83 84 85 86 87

  pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
  if (pTask->pState == NULL) {
    return -1;
  }

88
  int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
89
  SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
H
Haojun Liao 已提交
90
  initStreamStateAPI(&handle.api);
D
dapan1121 已提交
91

92
  pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
93
  ASSERT(pTask->exec.pExecutor);
L
Liu Jicong 已提交
94

L
Liu Jicong 已提交
95
  streamSetupTrigger(pTask);
L
Liu Jicong 已提交
96 97
  return 0;
}
H
refact  
Hongze Cheng 已提交
98

S
Shengliang Guan 已提交
99
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
wafwerar's avatar
wafwerar 已提交
100
  SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
L
Liu Jicong 已提交
101
  if (pSnode == NULL) {
L
Liu Jicong 已提交
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
103 104
    return NULL;
  }
105
  pSnode->path = taosStrdup(path);
L
Liu Jicong 已提交
106 107 108 109
  if (pSnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
S
Shengliang Guan 已提交
110
  pSnode->msgCb = pOption->msgCb;
L
Liu Jicong 已提交
111 112

  pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE);
L
Liu Jicong 已提交
113
  if (pSnode->pMeta == NULL) {
L
Liu Jicong 已提交
114 115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
L
Liu Jicong 已提交
116
  }
L
Liu Jicong 已提交
117

S
Shengliang Guan 已提交
118
  return pSnode;
L
Liu Jicong 已提交
119 120 121 122 123

FAIL:
  taosMemoryFree(pSnode->path);
  taosMemoryFree(pSnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
124 125
}

L
Liu Jicong 已提交
126
void sndClose(SSnode *pSnode) {
127
  streamMetaCommit(pSnode->pMeta);
L
Liu Jicong 已提交
128 129
  streamMetaClose(pSnode->pMeta);
  taosMemoryFree(pSnode->path);
wafwerar's avatar
wafwerar 已提交
130
  taosMemoryFree(pSnode);
L
Liu Jicong 已提交
131
}
S
Shengliang Guan 已提交
132 133 134

int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }

L
Liu Jicong 已提交
135 136
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  int32_t code;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138
  // 1.deserialize msg and build task
L
Liu Jicong 已提交
139 140 141 142
  SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
143

L
Liu Jicong 已提交
144 145
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
146
  code = tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
147 148 149 150
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
L
Liu Jicong 已提交
151 152 153
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
154
  ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
L
Liu Jicong 已提交
155

L
Liu Jicong 已提交
156
  // 2.save task
157
  taosWLockLatch(&pSnode->pMeta->lock);
158
  code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
L
Liu Jicong 已提交
159
  if (code < 0) {
160
    taosWUnLockLatch(&pSnode->pMeta->lock);
L
Liu Jicong 已提交
161 162
    return -1;
  }
L
Liu Jicong 已提交
163

164 165
  taosWUnLockLatch(&pSnode->pMeta->lock);

L
Liu Jicong 已提交
166 167 168 169 170
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
    streamSetParamForRecover(pTask);
    streamAggRecoverPrepare(pTask);
  }
L
Liu Jicong 已提交
171

L
Liu Jicong 已提交
172
  return 0;
L
Liu Jicong 已提交
173
}
L
Liu Jicong 已提交
174

L
Liu Jicong 已提交
175 176
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
  SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
177
  streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
L
Liu Jicong 已提交
178
  return 0;
L
Liu Jicong 已提交
179 180
}

L
Liu Jicong 已提交
181
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
182 183
  SStreamTaskRunReq *pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
184
  SStreamTask       *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
185 186
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
187
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
188 189 190 191
    return 0;
  } else {
    return -1;
  }
L
Liu Jicong 已提交
192 193
}

L
Liu Jicong 已提交
194 195 196 197
int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
198 199
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
200
  tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
L
Liu Jicong 已提交
201
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
202 203
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
204
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
205
  if (pTask) {
206 207
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
208
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
209 210 211
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
212 213 214
  }
}

L
Liu Jicong 已提交
215
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
216 217 218 219 220 221 222
  char              *msgStr = pMsg->pCont;
  char              *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
223
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
224
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
225
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
H
Haojun Liao 已提交
226

L
Liu Jicong 已提交
227
  if (pTask) {
H
Haojun Liao 已提交
228
    SRpcMsg rsp = { .info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
229
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
230
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
231
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
232
    return 0;
L
Liu Jicong 已提交
233 234 235 236 237 238 239
  } else {
    return -1;
  }
}

int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
L
Liu Jicong 已提交
240
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
241
  SStreamTask        *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
L
Liu Jicong 已提交
242
  if (pTask) {
243
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
244
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
245 246 247
    return 0;
  } else {
    return -1;
L
Liu Jicong 已提交
248 249 250 251
  }
  return 0;
}

L
Liu Jicong 已提交
252
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
253 254 255 256
  //
  return 0;
}

L
Liu Jicong 已提交
257 258 259
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
  void   *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
L
Liu Jicong 已提交
260 261
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_DEPLOY:
L
Liu Jicong 已提交
262
      return sndProcessTaskDeployReq(pSnode, pReq, len);
L
Liu Jicong 已提交
263
    case TDMT_STREAM_TASK_DROP:
L
Liu Jicong 已提交
264
      return sndProcessTaskDropReq(pSnode, pReq, len);
L
Liu Jicong 已提交
265
    default:
S
shm  
Shengliang Guan 已提交
266
      ASSERT(0);
L
Liu Jicong 已提交
267
  }
L
Liu Jicong 已提交
268
  return 0;
L
Liu Jicong 已提交
269 270
}

L
Liu Jicong 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
  char   *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

  // deserialize
  SStreamRecoverFinishReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, msg, msgLen);
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

  // find task
L
Liu Jicong 已提交
284
  SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
L
Liu Jicong 已提交
285 286 287 288 289
  if (pTask == NULL) {
    return -1;
  }
  // do process request
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
290
    streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
291 292 293
    return -1;
  }

L
Liu Jicong 已提交
294
  streamMetaReleaseTask(pSnode->pMeta, pTask);
L
Liu Jicong 已提交
295 296 297 298 299 300 301 302
  return 0;
}

int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
303
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
304 305 306 307
  switch (pMsg->msgType) {
    case TDMT_STREAM_TASK_RUN:
      return sndProcessTaskRunReq(pSnode, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
308
      return sndProcessTaskDispatchReq(pSnode, pMsg, true);
L
Liu Jicong 已提交
309 310
    case TDMT_STREAM_TASK_DISPATCH_RSP:
      return sndProcessTaskDispatchRsp(pSnode, pMsg);
L
Liu Jicong 已提交
311 312
    case TDMT_STREAM_RETRIEVE:
      return sndProcessTaskRetrieveReq(pSnode, pMsg);
L
Liu Jicong 已提交
313
    case TDMT_STREAM_RETRIEVE_RSP:
5
54liuyao 已提交
314
      return sndProcessTaskRetrieveRsp(pSnode, pMsg);
L
Liu Jicong 已提交
315 316 317 318
    case TDMT_STREAM_RECOVER_FINISH:
      return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
L
Liu Jicong 已提交
319 320 321 322
    default:
      ASSERT(0);
  }
  return 0;
L
Liu Jicong 已提交
323
}