dmTransport.c 9.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18
#include "qworker.h"
19

S
Shengliang Guan 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
static inline void dmSendRsp(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
  if (InChildProc(pWrapper)) {
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
  } else {
    rpcSendResponse(pMsg);
  }
}

static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
  SEpSet epSet = {0};
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);

  const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->pCont = rpcMallocCont(contLen);
  if (pMsg->pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
    pMsg->contLen = contLen;
  }
}

static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
dengyihao's avatar
dengyihao 已提交
44
  pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
  int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
    rsp.contLen = contLen;
  }
  dmSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
}
59

S
Shengliang Guan 已提交
60 61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
62 63 64 65
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67 68
  const STraceId *trace = &pMsg->info.traceId;
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
69
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
70 71 72 73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
74
  SDnodeTrans  *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
75
  int32_t       code = -1;
S
Shengliang Guan 已提交
76
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
77
  SMgmtWrapper *pWrapper = NULL;
S
Shengliang Guan 已提交
78
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
79

S
Shengliang Guan 已提交
80
  const STraceId *trace = &pRpc->info.traceId;
dengyihao's avatar
dengyihao 已提交
81 82
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
83

S
Shengliang Guan 已提交
84 85 86 87 88
  switch (pRpc->msgType) {
    case TDMT_DND_NET_TEST:
      dmProcessNetTestReq(pDnode, pRpc);
      return;
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
89
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
90
    case TDMT_SCH_FETCH_RSP:
S
Shengliang Guan 已提交
91 92 93 94 95 96 97 98 99
      qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
      return;
    case TDMT_MND_STATUS_RSP:
      if (pEpSet != NULL) {
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
      }
      break;
    default:
      break;
S
Shengliang Guan 已提交
100 101
  }

S
Shengliang Guan 已提交
102
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
103
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
104
      dmProcessServerStartupStatus(pDnode, pRpc);
S
Shengliang Guan 已提交
105
      return;
S
Shengliang Guan 已提交
106
    } else {
107
      terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
108
      goto _OVER;
S
Shengliang Guan 已提交
109 110 111
    }
  }

S
Shengliang Guan 已提交
112
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
113
    terrno = TSDB_CODE_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
114
    goto _OVER;
S
Shengliang Guan 已提交
115 116 117 118
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
119
    goto _OVER;
S
Shengliang Guan 已提交
120 121 122 123 124 125 126 127 128
  }

  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
  if (pHandle->needCheckVgId) {
    if (pRpc->contLen > 0) {
      const SMsgHead *pHead = pRpc->pCont;
      const int32_t   vgId = ntohl(pHead->vgId);
      switch (vgId) {
        case QNODE_HANDLE:
129
          pWrapper = &pDnode->wrappers[QNODE];
S
Shengliang Guan 已提交
130 131
          break;
        case SNODE_HANDLE:
132
          pWrapper = &pDnode->wrappers[SNODE];
S
Shengliang Guan 已提交
133 134
          break;
        case MNODE_HANDLE:
135
          pWrapper = &pDnode->wrappers[MNODE];
S
Shengliang Guan 已提交
136 137 138
          break;
        default:
          break;
S
Shengliang Guan 已提交
139
      }
S
Shengliang Guan 已提交
140 141 142
    } else {
      terrno = TSDB_CODE_INVALID_MSG_LEN;
      goto _OVER;
S
Shengliang Guan 已提交
143 144
    }
  }
S
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146
  if (dmMarkWrapper(pWrapper) != 0) {
S
Shengliang Guan 已提交
147 148
    pWrapper = NULL;
    goto _OVER;
S
Shengliang Guan 已提交
149
  }
S
Shengliang Guan 已提交
150

S
Shengliang Guan 已提交
151
  pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
152
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
153
  if (pMsg == NULL) goto _OVER;
S
Shengliang Guan 已提交
154

S
Shengliang Guan 已提交
155 156
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
S
Shengliang Guan 已提交
157

158
  if (InParentProc(pWrapper)) {
159
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
S
Shengliang Guan 已提交
160 161 162
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
163 164

_OVER:
165
  if (code != 0) {
S
Shengliang Guan 已提交
166
    if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
167
    dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
S
Shengliang Guan 已提交
168

S
Shengliang Guan 已提交
169
    if (IsReq(pRpc)) {
170
      SRpcMsg rsp = {.code = code, .info = pRpc->info};
171 172
      if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
          pRpc->msgType < TDMT_VND_MSG) {
173 174 175 176 177
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
      }

      if (pWrapper != NULL) {
        dmSendRsp(&rsp);
S
Shengliang Guan 已提交
178
      } else {
179
        rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
180
      }
S
Shengliang Guan 已提交
181
    }
S
Shengliang Guan 已提交
182

S
Shengliang Guan 已提交
183
    if (pMsg != NULL) {
S
Shengliang Guan 已提交
184
      dGTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
185 186
      taosFreeQitem(pMsg);
    }
S
Shengliang Guan 已提交
187
    rpcFreeCont(pRpc->pCont);
S
Shengliang Guan 已提交
188
    pRpc->pCont = NULL;
S
Shengliang Guan 已提交
189 190
  }

S
Shengliang Guan 已提交
191
  dmReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
192 193
}

S
Shengliang Guan 已提交
194
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
195
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
196

S
Shengliang 已提交
197 198
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
199
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
S
Shengliang Guan 已提交
200 201 202
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
S
Shengliang Guan 已提交
203
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
204
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
205 206 207 208
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
209
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
210
      }
S
Shengliang Guan 已提交
211
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
212
    }
S
Shengliang Guan 已提交
213 214

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
215 216 217 218 219
  }

  return 0;
}

S
Shengliang Guan 已提交
220
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
221 222
  SDnode *pDnode = dmInstance();
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
223 224
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
225
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
226
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
227
    return -1;
228
  } else {
S
Shengliang Guan 已提交
229
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
230
    return 0;
S
shm  
Shengliang Guan 已提交
231 232 233
  }
}

234 235
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
236
  if (InChildProc(pWrapper)) {
237
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
238 239
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
240 241 242
  }
}

243 244
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
  SMgmtWrapper *pWrapper = pHandle->wrapper;
245
  if (InChildProc(pWrapper)) {
246
    SRpcMsg msg = {.code = type, .info = *pHandle};
247
    dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
248
  } else {
dengyihao's avatar
dengyihao 已提交
249
    rpcReleaseHandle(pHandle, type);
S
Shengliang Guan 已提交
250
  }
S
shm  
Shengliang Guan 已提交
251 252
}

dengyihao's avatar
dengyihao 已提交
253
static bool rpcRfp(int32_t code, tmsg_t msgType) {
dengyihao's avatar
dengyihao 已提交
254
  if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
D
dapan1121 已提交
255
      code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
D
dapan1121 已提交
256
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
dengyihao's avatar
dengyihao 已提交
257 258
      return false;
    }
dengyihao's avatar
dengyihao 已提交
259 260 261 262 263
    return true;
  } else {
    return false;
  }
}
M
Minghao Li 已提交
264

265
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
266 267 268
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
dengyihao's avatar
dengyihao 已提交
269
  rpcInit.label = "DND-C";
S
Shengliang Guan 已提交
270
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
271
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
272 273 274 275
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
276
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
277 278 279 280 281 282 283 284 285 286 287

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

288
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
289 290 291 292 293 294 295 296
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

297
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
298 299 300
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
301 302
  strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
  rpcInit.localPort = tsServerPort;
dengyihao's avatar
dengyihao 已提交
303
  rpcInit.label = "DND-S";
S
Shengliang Guan 已提交
304
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
305
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

321
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
322 323 324 325 326 327 328 329
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

330 331 332 333 334 335 336 337 338
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .sendRedirectRspFp = dmSendRedirectRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
S
Shengliang Guan 已提交
339 340
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
341
}