dmTransport.c 9.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18
#include "qworker.h"
19

20 21 22
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
static void dmSendRsp(SRpcMsg *pMsg);
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
23

S
Shengliang Guan 已提交
24
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
dengyihao's avatar
dengyihao 已提交
25 26 27 28 29 30
  SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
  // if (IsReq(pRpc)) {
  //  terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
  //  dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
  //  return -1;
  //}
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
33 34 35
  memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
  pMsg->conn.clientIp = pConnInfo->clientIp;
  pMsg->conn.clientPort = pConnInfo->clientPort;
S
Shengliang Guan 已提交
36 37 38
  return 0;
}

S
Shengliang Guan 已提交
39 40
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
41 42 43 44
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46
  dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
47
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
48 49 50 51
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
52
  SDnodeTrans * pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
53
  int32_t       code = -1;
dengyihao's avatar
dengyihao 已提交
54
  SRpcMsg *     pMsg = NULL;
S
Shengliang Guan 已提交
55
  SMgmtWrapper *pWrapper = NULL;
S
Shengliang Guan 已提交
56
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
57

S
Shengliang Guan 已提交
58 59
  dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
         pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
60

S
Shengliang Guan 已提交
61
  if (pRpc->msgType == TDMT_DND_NET_TEST) {
S
Shengliang Guan 已提交
62
    dmProcessNetTestReq(pDnode, pRpc);
S
Shengliang Guan 已提交
63
    return;
S
Shengliang Guan 已提交
64
  } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
D
dapan1121 已提交
65
    qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
S
Shengliang Guan 已提交
66
    return;
67 68
  } else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) {
    dmSetMnodeEpSet(&pDnode->data, pEpSet);
S
Shengliang Guan 已提交
69 70 71
  } else {
  }

S
Shengliang Guan 已提交
72
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
73
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
74
      dmProcessServerStartupStatus(pDnode, pRpc);
S
Shengliang Guan 已提交
75
      return;
S
Shengliang Guan 已提交
76
    } else {
77
      terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
78
      goto _OVER;
S
Shengliang Guan 已提交
79 80 81
    }
  }

S
Shengliang Guan 已提交
82
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
83
    terrno = TSDB_CODE_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
84
    goto _OVER;
S
Shengliang Guan 已提交
85 86 87 88
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
89
    goto _OVER;
S
Shengliang Guan 已提交
90 91 92
  } else {
    pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
    if (pHandle->needCheckVgId) {
93 94 95 96 97
      if (pRpc->contLen > 0) {
        SMsgHead *pHead = pRpc->pCont;
        int32_t   vgId = ntohl(pHead->vgId);
        if (vgId == QNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[QNODE];
98 99
        } else if (vgId == SNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[SNODE];
100 101 102 103
        } else if (vgId == MNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[MNODE];
        } else {
        }
S
Shengliang Guan 已提交
104
      } else {
105
        terrno = TSDB_CODE_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
106
        goto _OVER;
S
Shengliang Guan 已提交
107 108 109
      }
    }
  }
S
Shengliang Guan 已提交
110

S
Shengliang Guan 已提交
111
  if (dmMarkWrapper(pWrapper) != 0) {
S
Shengliang Guan 已提交
112 113
    pWrapper = NULL;
    goto _OVER;
S
Shengliang Guan 已提交
114
  } else {
115
    pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
116
  }
S
Shengliang Guan 已提交
117

S
Shengliang Guan 已提交
118
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
119 120 121
  if (pMsg == NULL) {
    goto _OVER;
  }
S
Shengliang Guan 已提交
122
  dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
123

S
Shengliang Guan 已提交
124 125 126 127
  if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
    goto _OVER;
  }

128
  if (InParentProc(pWrapper)) {
129
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
S
Shengliang Guan 已提交
130 131 132
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
133 134

_OVER:
135
  if (code != 0) {
S
Shengliang Guan 已提交
136 137
    dTrace("failed to process msg:%p since %s, handle:%p", pMsg, terrstr(), pRpc->info.handle);

S
Shengliang Guan 已提交
138 139
    if (terrno != 0) code = terrno;

S
Shengliang Guan 已提交
140
    if (IsReq(pRpc)) {
141 142
      SRpcMsg rsp = {.code = code, .info = pRpc->info};

143 144
      if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
          pRpc->msgType < TDMT_VND_MSG) {
145 146 147 148 149
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
      }

      if (pWrapper != NULL) {
        dmSendRsp(&rsp);
S
Shengliang Guan 已提交
150
      } else {
151
        rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
152
      }
S
Shengliang Guan 已提交
153
    }
S
Shengliang Guan 已提交
154

S
Shengliang Guan 已提交
155 156 157 158
    if (pMsg != NULL) {
      dTrace("msg:%p, is freed", pMsg);
      taosFreeQitem(pMsg);
    }
S
Shengliang Guan 已提交
159 160 161
    rpcFreeCont(pRpc->pCont);
  }

S
Shengliang Guan 已提交
162
  dmReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
163 164
}

S
Shengliang Guan 已提交
165
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
166
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
167

S
Shengliang 已提交
168 169
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
dengyihao's avatar
dengyihao 已提交
170
    SArray *      pArray = (*pWrapper->func.getHandlesFp)();
S
Shengliang Guan 已提交
171 172 173
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
dengyihao's avatar
dengyihao 已提交
174
      SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
175
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
176 177 178 179
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
180
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
181
      }
S
Shengliang Guan 已提交
182
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
183
    }
S
Shengliang Guan 已提交
184 185

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
186 187 188 189 190
  }

  return 0;
}

S
Shengliang Guan 已提交
191
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
192 193
  SDnode *pDnode = dmInstance();
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
194 195
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
196
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
197
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
198
    return -1;
199
  } else {
S
Shengliang Guan 已提交
200
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
201
    return 0;
S
shm  
Shengliang Guan 已提交
202 203 204
  }
}

205
static inline void dmSendRsp(SRpcMsg *pMsg) {
206
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
S
Shengliang Guan 已提交
207 208
  if (InChildProc(pWrapper)) {
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
209
  } else {
S
Shengliang Guan 已提交
210
    rpcSendResponse(pMsg);
S
shm  
Shengliang Guan 已提交
211
  }
D
dapan1121 已提交
212
}
S
Shengliang Guan 已提交
213

214
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
215 216
  SEpSet epSet = {0};
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
217

218
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
219 220 221 222
  pMsg->pCont = rpcMallocCont(contLen);
  if (pMsg->pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
223
    tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
224 225 226 227
    pMsg->contLen = contLen;
  }
}

228
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
S
Shengliang Guan 已提交
229
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
230
  int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
S
Shengliang Guan 已提交
231 232 233 234

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
235
  } else {
236
    tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
S
Shengliang Guan 已提交
237
    rsp.contLen = contLen;
M
Minghao Li 已提交
238
  }
S
Shengliang Guan 已提交
239 240 241
  dmSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
M
Minghao Li 已提交
242 243
}

244 245
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
246
  if (InChildProc(pWrapper)) {
247
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
248 249
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
250 251 252
  }
}

253 254
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
  SMgmtWrapper *pWrapper = pHandle->wrapper;
255
  if (InChildProc(pWrapper)) {
256
    SRpcMsg msg = {.code = type, .info = *pHandle};
257
    dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
258
  } else {
259
    rpcReleaseHandle(pHandle->handle, type);
S
Shengliang Guan 已提交
260
  }
S
shm  
Shengliang Guan 已提交
261 262
}

263
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; }
M
Minghao Li 已提交
264

265
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
266 267 268 269 270
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
271
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
272 273 274 275
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
276
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
277 278 279 280 281 282 283 284 285 286 287

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

288
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
289 290 291 292 293 294 295 296
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

297
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
298 299 300
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
301 302
  strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
  rpcInit.localPort = tsServerPort;
S
Shengliang Guan 已提交
303 304
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
305
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

321
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
322 323 324 325 326 327 328 329
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

330 331 332 333 334 335 336 337 338
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .sendRedirectRspFp = dmSendRedirectRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
S
Shengliang Guan 已提交
339 340
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
341
}