dmTransport.c 10.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
18 19
#include "qworker.h"

20 21
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
22
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
25
  SRpcConnInfo connInfo = {0};
S
Shengliang Guan 已提交
26
  if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
S
Shengliang Guan 已提交
27
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
S
Shengliang Guan 已提交
28
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
S
Shengliang Guan 已提交
29 30 31
    return -1;
  }

S
Shengliang Guan 已提交
32 33 34 35
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  memcpy(pMsg->conn.user, connInfo.user, TSDB_USER_LEN);
  pMsg->conn.clientIp = connInfo.clientIp;
  pMsg->conn.clientPort = connInfo.clientPort;
S
Shengliang Guan 已提交
36 37 38
  return 0;
}

S
Shengliang Guan 已提交
39 40
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
41 42 43 44
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46
  dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
47
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
48 49 50 51
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
52
  SDnodeTrans * pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
53
  int32_t       code = -1;
dengyihao's avatar
dengyihao 已提交
54
  SRpcMsg *     pMsg = NULL;
S
Shengliang Guan 已提交
55
  bool          needRelease = false;
56
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
57 58
  SMgmtWrapper *pWrapper = NULL;

S
Shengliang Guan 已提交
59 60
  dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
         pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
61 62 63 64 65 66
  pRpc->info.noResp = 0;
  pRpc->info.persistHandle = 0;
  pRpc->info.wrapper = NULL;
  pRpc->info.node = NULL;
  pRpc->info.rsp = NULL;
  pRpc->info.rspLen = 0;
67

S
Shengliang Guan 已提交
68
  if (pRpc->msgType == TDMT_DND_NET_TEST) {
S
Shengliang Guan 已提交
69
    dmProcessNetTestReq(pDnode, pRpc);
70
    goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
71
  } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
72 73
    qWorkerProcessFetchRsp(NULL, NULL, pRpc);
    goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
74 75 76
  } else {
  }

S
Shengliang Guan 已提交
77
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
78
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
79
      dmProcessServerStartupStatus(pDnode, pRpc);
80
      goto _OVER_JUST_FREE;
S
Shengliang Guan 已提交
81
    } else {
82 83
      terrno = TSDB_CODE_APP_NOT_READY;
      goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
84 85 86
    }
  }

S
Shengliang Guan 已提交
87
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
88
    terrno = TSDB_CODE_INVALID_MSG_LEN;
89
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
90 91 92 93
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
94
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
95 96 97
  } else {
    pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
    if (pHandle->needCheckVgId) {
98 99 100 101 102 103 104 105 106
      if (pRpc->contLen > 0) {
        SMsgHead *pHead = pRpc->pCont;
        int32_t   vgId = ntohl(pHead->vgId);
        if (vgId == QNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[QNODE];
        } else if (vgId == MNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[MNODE];
        } else {
        }
S
Shengliang Guan 已提交
107
      } else {
108
        terrno = TSDB_CODE_INVALID_MSG_LEN;
109
        goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
110 111 112
      }
    }
  }
S
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  if (dmMarkWrapper(pWrapper) != 0) {
115
    goto _OVER_RSP_FREE;
S
Shengliang Guan 已提交
116 117
  } else {
    needRelease = true;
118
    pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
119
  }
S
Shengliang Guan 已提交
120

S
Shengliang Guan 已提交
121
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
122 123 124
  if (pMsg == NULL) {
    goto _OVER;
  }
S
Shengliang Guan 已提交
125

S
Shengliang Guan 已提交
126 127 128 129
  if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
    goto _OVER;
  }

130
  if (InParentProc(pWrapper)) {
131
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
S
Shengliang Guan 已提交
132 133 134
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
135 136 137

_OVER:
  if (code == 0) {
138
    if (pWrapper != NULL && InParentProc(pWrapper)) {
S
Shengliang Guan 已提交
139
      dTrace("msg:%p, is freed after push to cqueue", pMsg);
S
Shengliang Guan 已提交
140 141 142 143
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
Shengliang Guan 已提交
144 145 146
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
    if (terrno != 0) code = terrno;

S
Shengliang Guan 已提交
147
    if (IsReq(pRpc)) {
148
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
S
Shengliang Guan 已提交
149
        if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) {
150 151 152
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }
S
Shengliang Guan 已提交
153
      SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
S
Shengliang Guan 已提交
154
      tmsgSendRsp(&rspMsg);
S
Shengliang Guan 已提交
155
    }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157 158 159 160 161
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

162 163 164
  if (needRelease) {
    dmReleaseWrapper(pWrapper);
  }
165 166 167 168 169 170 171 172 173 174
  return;

_OVER_JUST_FREE:
  rpcFreeCont(pRpc->pCont);
  return;

_OVER_RSP_FREE:
  rpcFreeCont(pRpc->pCont);
  SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info};
  rpcSendResponse(&simpleRsp);
S
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
178
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
179

S
Shengliang 已提交
180 181
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
dengyihao's avatar
dengyihao 已提交
182
    SArray *      pArray = (*pWrapper->func.getHandlesFp)();
S
Shengliang Guan 已提交
183 184 185
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
dengyihao's avatar
dengyihao 已提交
186
      SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
187
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
188 189 190 191
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
192
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
193
      }
S
Shengliang Guan 已提交
194
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
195
    }
S
Shengliang Guan 已提交
196 197

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
198 199 200 201 202
  }

  return 0;
}

S
Shengliang Guan 已提交
203
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
204 205
  SDnode *pDnode = dmInstance();
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
206 207
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
208
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
209
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
210
    return -1;
211
  } else {
S
Shengliang Guan 已提交
212
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
213
    return 0;
S
shm  
Shengliang Guan 已提交
214 215 216
  }
}

217
static inline void dmSendRsp(SRpcMsg *pMsg) {
218
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
S
Shengliang Guan 已提交
219 220 221 222
  if (InChildProc(pWrapper)) {
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
223
  } else {
S
Shengliang Guan 已提交
224
    rpcSendResponse(pMsg);
S
shm  
Shengliang Guan 已提交
225
  }
D
dapan1121 已提交
226
}
S
Shengliang Guan 已提交
227

228
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
S
Shengliang Guan 已提交
229 230 231 232 233 234 235
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
  SMEpSet msg = {.epSet = *pNewEpSet};
  int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
236
  } else {
S
Shengliang Guan 已提交
237 238
    tSerializeSMEpSet(rsp.pCont, contLen, &msg);
    rsp.contLen = contLen;
M
Minghao Li 已提交
239
  }
S
Shengliang Guan 已提交
240 241 242
  dmSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
M
Minghao Li 已提交
243 244
}

245 246
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
247
  if (InChildProc(pWrapper)) {
248
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
S
Shengliang Guan 已提交
249 250
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
251 252
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
253 254 255
  }
}

256 257
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
  SMgmtWrapper *pWrapper = pHandle->wrapper;
258
  if (InChildProc(pWrapper)) {
259
    SRpcMsg msg = {.code = type, .info = *pHandle};
260
    dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
261
  } else {
262
    rpcReleaseHandle(pHandle->handle, type);
S
Shengliang Guan 已提交
263
  }
S
shm  
Shengliang Guan 已提交
264 265
}

266
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; }
M
Minghao Li 已提交
267

268
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
269 270 271 272 273
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
274
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
275 276 277 278 279
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = INTERNAL_USER;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
280
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
281 282 283 284 285 286 287 288 289 290 291

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

292
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
293 294 295 296 297 298 299 300
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

301
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
302 303 304
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
305 306
  strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
  rpcInit.localPort = tsServerPort;
S
Shengliang Guan 已提交
307 308
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
309
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

325
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
326 327 328 329 330 331 332 333
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

334 335 336 337 338 339 340 341 342
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .sendRedirectRspFp = dmSendRedirectRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
S
Shengliang Guan 已提交
343 344
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
345
}
S
Shengliang Guan 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376

static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
  SDnode *pDnode = dmInstance();
  SEpSet  epSet = {0};
  dmGetMnodeEpSet(&pDnode->data, &epSet);

  dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
    if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
      epSet.inUse = (i + 1) % epSet.numOfEps;
    }

    epSet.eps[i].port = htons(epSet.eps[i].port);
  }

  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
  SMEpSet msg = {.epSet = epSet};
  int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSMEpSet(rsp.pCont, contLen, &msg);
    rsp.contLen = contLen;
  }

  dmSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
}