dmTransport.c 11.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
18 19
#include "qworker.h"

20 21
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
22
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
25
  SRpcConnInfo connInfo = {0};
S
Shengliang Guan 已提交
26
  if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
S
Shengliang Guan 已提交
27
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
S
Shengliang Guan 已提交
28
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
S
Shengliang Guan 已提交
29 30 31
    return -1;
  }

S
Shengliang Guan 已提交
32 33 34 35
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  memcpy(pMsg->conn.user, connInfo.user, TSDB_USER_LEN);
  pMsg->conn.clientIp = connInfo.clientIp;
  pMsg->conn.clientPort = connInfo.clientPort;
S
Shengliang Guan 已提交
36 37 38
  return 0;
}

S
Shengliang Guan 已提交
39 40
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
41 42 43 44
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46
  dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
47
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
48 49 50 51 52 53
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
  SDnodeTrans  *pTrans = &pDnode->trans;
  int32_t       code = -1;
S
Shengliang Guan 已提交
54
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
55
  bool          needRelease = false;
56
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
57 58
  SMgmtWrapper *pWrapper = NULL;

S
Shengliang Guan 已提交
59 60
  dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
         pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
61 62 63 64 65 66
  pRpc->info.noResp = 0;
  pRpc->info.persistHandle = 0;
  pRpc->info.wrapper = NULL;
  pRpc->info.node = NULL;
  pRpc->info.rsp = NULL;
  pRpc->info.rspLen = 0;
67

S
Shengliang Guan 已提交
68
  if (pRpc->msgType == TDMT_DND_NET_TEST) {
S
Shengliang Guan 已提交
69
    dmProcessNetTestReq(pDnode, pRpc);
70
    goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
71
  } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
72 73
    qWorkerProcessFetchRsp(NULL, NULL, pRpc);
    goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
74 75 76
  } else {
  }

S
Shengliang Guan 已提交
77
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
78
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
79
      dmProcessServerStartupStatus(pDnode, pRpc);
80
      goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
81
    } else {
82 83
      terrno = TSDB_CODE_APP_NOT_READY;
      goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
84 85 86
    }
  }

S
Shengliang Guan 已提交
87
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
88
    terrno = TSDB_CODE_INVALID_MSG_LEN;
89
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
90 91 92 93
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
94
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
95 96 97
  } else {
    pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
    if (pHandle->needCheckVgId) {
98 99 100 101 102 103 104 105 106
      if (pRpc->contLen > 0) {
        SMsgHead *pHead = pRpc->pCont;
        int32_t   vgId = ntohl(pHead->vgId);
        if (vgId == QNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[QNODE];
        } else if (vgId == MNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[MNODE];
        } else {
        }
S
Shengliang Guan 已提交
107
      } else {
108
        terrno = TSDB_CODE_INVALID_MSG_LEN;
109
        goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
110 111 112
      }
    }
  }
S
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  if (dmMarkWrapper(pWrapper) != 0) {
115
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
116 117
  } else {
    needRelease = true;
118
    pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
119
  }
S
Shengliang Guan 已提交
120

S
Shengliang Guan 已提交
121
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
122 123 124
  if (pMsg == NULL) {
    goto _OVER;
  }
S
Shengliang Guan 已提交
125

S
Shengliang Guan 已提交
126 127 128 129
  if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
    goto _OVER;
  }

130
  if (InParentProc(pWrapper)) {
131
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
S
Shengliang Guan 已提交
132 133 134
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
135 136 137

_OVER:
  if (code == 0) {
138
    if (pWrapper != NULL && InParentProc(pWrapper)) {
S
Shengliang Guan 已提交
139
      dTrace("msg:%p, is freed after push to cqueue", pMsg);
S
Shengliang Guan 已提交
140 141 142 143
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
Shengliang Guan 已提交
144 145 146
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
    if (terrno != 0) code = terrno;

S
Shengliang Guan 已提交
147
    if (IsReq(pRpc)) {
148
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
S
Shengliang Guan 已提交
149
        if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) {
150 151 152
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }
S
Shengliang Guan 已提交
153
      SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
S
Shengliang Guan 已提交
154
      tmsgSendRsp(&rspMsg);
S
Shengliang Guan 已提交
155
    }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157 158 159 160 161
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

162 163 164
  if (needRelease) {
    dmReleaseWrapper(pWrapper);
  }
165 166 167 168 169 170 171 172 173 174
  return;

_OVER_JUST_FREE:
  rpcFreeCont(pRpc->pCont);
  return;

_OVER_RSP_FREE:
  rpcFreeCont(pRpc->pCont);
  SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info};
  rpcSendResponse(&simpleRsp);
S
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
178
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
179

S
Shengliang 已提交
180 181
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
182 183 184 185
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
186 187
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
188 189 190 191
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
192
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
193
      }
S
Shengliang Guan 已提交
194
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
195
    }
S
Shengliang Guan 已提交
196 197

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
198 199 200 201 202
  }

  return 0;
}

S
Shengliang Guan 已提交
203
static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
204 205 206
  SDnode *pDnode = dmInstance();
  SEpSet  epSet = {0};
  dmGetMnodeEpSet(&pDnode->data, &epSet);
S
Shengliang Guan 已提交
207

S
Shengliang Guan 已提交
208
  dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
209 210
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
211
    if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
S
Shengliang Guan 已提交
212 213 214 215 216
      epSet.inUse = (i + 1) % epSet.numOfEps;
    }

    epSet.eps[i].port = htons(epSet.eps[i].port);
  }
S
Shengliang Guan 已提交
217

dengyihao's avatar
dengyihao 已提交
218 219
  SMEpSet msg = {.epSet = epSet};
  int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
S
Shengliang Guan 已提交
220 221 222

  SRpcMsg rsp = {
      .code = TSDB_CODE_RPC_REDIRECT,
S
Shengliang Guan 已提交
223
      .info = pMsg->info,
S
Shengliang Guan 已提交
224 225 226 227 228
      .contLen = len,
  };
  rsp.pCont = rpcMallocCont(len);
  tSerializeSMEpSet(rsp.pCont, len, &msg);
  rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
229 230

  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
231 232
}

233 234
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
  SDnode *pDnode = dmInstance();
S
Shengliang Guan 已提交
235 236
  if (pDnode->status != DND_STAT_RUNNING) {
    pRsp->code = TSDB_CODE_NODE_OFFLINE;
237 238
    rpcFreeCont(pReq->pCont);
    pReq->pCont = NULL;
S
Shengliang Guan 已提交
239 240 241
  } else {
    rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
  }
S
Shengliang Guan 已提交
242 243
}

S
Shengliang Guan 已提交
244
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
245 246
  SDnode *pDnode = dmInstance();
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
247 248
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
249
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
250
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
251
    return -1;
252
  } else {
S
Shengliang Guan 已提交
253
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
254
    return 0;
S
shm  
Shengliang Guan 已提交
255 256 257
  }
}

258
static inline void dmSendRsp(SRpcMsg *pMsg) {
259
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
260 261
  if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
    dmSendRpcRedirectRsp(pMsg);
262
  } else {
263 264
    if (InChildProc(pWrapper)) {
      dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
265 266 267
    } else {
      rpcSendResponse(pMsg);
    }
S
shm  
Shengliang Guan 已提交
268
  }
D
dapan1121 已提交
269
}
S
Shengliang Guan 已提交
270

271 272
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
273
  if (InChildProc(pWrapper)) {
274
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
275
  } else {
S
Shengliang Guan 已提交
276
    SRpcMsg rsp = {0};
M
Minghao Li 已提交
277 278
    SMEpSet msg = {.epSet = *pNewEpSet};
    int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
S
Shengliang Guan 已提交
279 280 281 282 283
    rsp.pCont = rpcMallocCont(len);
    rsp.contLen = len;
    tSerializeSMEpSet(rsp.pCont, len, &msg);

    rsp.code = TSDB_CODE_RPC_REDIRECT;
284
    rsp.info = pMsg->info;
S
Shengliang Guan 已提交
285
    rpcSendResponse(&rsp);
M
Minghao Li 已提交
286 287 288
  }
}

289 290
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
291
  if (InChildProc(pWrapper)) {
292
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
293 294
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
295 296 297
  }
}

298 299
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
  SMgmtWrapper *pWrapper = pHandle->wrapper;
300
  if (InChildProc(pWrapper)) {
301
    SRpcMsg msg = {.code = type, .info = *pHandle};
302
    dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
303
  } else {
304
    rpcReleaseHandle(pHandle->handle, type);
S
Shengliang Guan 已提交
305
  }
S
shm  
Shengliang Guan 已提交
306 307
}

308
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; }
M
Minghao Li 已提交
309

310
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
311 312 313 314 315
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
316
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
317 318 319 320 321 322 323
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
  rpcInit.spi = 1;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
324
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339

  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  rpcInit.secret = pass;

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

340
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
341 342 343 344 345 346 347 348
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

349
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
350 351 352
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
353 354
  strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
  rpcInit.localPort = tsServerPort;
S
Shengliang Guan 已提交
355 356
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
357
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

373
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
374 375 376 377 378 379 380 381
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

382 383 384 385 386 387 388 389 390
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .sendRedirectRspFp = dmSendRedirectRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
S
Shengliang Guan 已提交
391 392
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
393
}