dmTransport.c 9.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18
#include "qworker.h"
19

S
Shengliang Guan 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
static inline void dmSendRsp(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
  if (InChildProc(pWrapper)) {
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
  } else {
    rpcSendResponse(pMsg);
  }
}

static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
  SEpSet epSet = {0};
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);

  const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->pCont = rpcMallocCont(contLen);
  if (pMsg->pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
    pMsg->contLen = contLen;
  }
}

static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
dengyihao's avatar
dengyihao 已提交
44
  pMsg->info.hasEpSet = 1;
D
dapan1121 已提交
45
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
  int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
    rsp.contLen = contLen;
  }
  dmSendRsp(&rsp);
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
}
59

S
Shengliang Guan 已提交
60 61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
62 63 64 65
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67 68
  const STraceId *trace = &pMsg->info.traceId;
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
69
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
70 71 72 73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
74
  SDnodeTrans  *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
75
  int32_t       code = -1;
S
Shengliang Guan 已提交
76
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
77
  SMgmtWrapper *pWrapper = NULL;
S
Shengliang Guan 已提交
78
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
79

S
Shengliang Guan 已提交
80
  const STraceId *trace = &pRpc->info.traceId;
dengyihao's avatar
dengyihao 已提交
81 82
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
83

S
Shengliang Guan 已提交
84 85 86 87 88
  switch (pRpc->msgType) {
    case TDMT_DND_NET_TEST:
      dmProcessNetTestReq(pDnode, pRpc);
      return;
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
89
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
90
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
91
    case TDMT_SCH_MERGE_FETCH_RSP:
D
dapan1121 已提交
92 93
    case TDMT_VND_SUBMIT_RSP:
      qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101
      return;
    case TDMT_MND_STATUS_RSP:
      if (pEpSet != NULL) {
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
      }
      break;
    default:
      break;
S
Shengliang Guan 已提交
102 103
  }

S
Shengliang Guan 已提交
104
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
105
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
106
      dmProcessServerStartupStatus(pDnode, pRpc);
S
Shengliang Guan 已提交
107
      return;
S
Shengliang Guan 已提交
108
    } else {
109
      terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
110
      goto _OVER;
S
Shengliang Guan 已提交
111 112 113
    }
  }

S
Shengliang Guan 已提交
114
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
115
    terrno = TSDB_CODE_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
116
    goto _OVER;
S
Shengliang Guan 已提交
117 118 119 120
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
121
    goto _OVER;
S
Shengliang Guan 已提交
122 123 124 125 126 127 128 129 130
  }

  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
  if (pHandle->needCheckVgId) {
    if (pRpc->contLen > 0) {
      const SMsgHead *pHead = pRpc->pCont;
      const int32_t   vgId = ntohl(pHead->vgId);
      switch (vgId) {
        case QNODE_HANDLE:
131
          pWrapper = &pDnode->wrappers[QNODE];
S
Shengliang Guan 已提交
132 133
          break;
        case SNODE_HANDLE:
134
          pWrapper = &pDnode->wrappers[SNODE];
S
Shengliang Guan 已提交
135 136
          break;
        case MNODE_HANDLE:
137
          pWrapper = &pDnode->wrappers[MNODE];
S
Shengliang Guan 已提交
138 139 140
          break;
        default:
          break;
S
Shengliang Guan 已提交
141
      }
S
Shengliang Guan 已提交
142 143 144
    } else {
      terrno = TSDB_CODE_INVALID_MSG_LEN;
      goto _OVER;
S
Shengliang Guan 已提交
145 146
    }
  }
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  if (dmMarkWrapper(pWrapper) != 0) {
S
Shengliang Guan 已提交
149 150
    pWrapper = NULL;
    goto _OVER;
S
Shengliang Guan 已提交
151
  }
S
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153
  pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
154
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
155
  if (pMsg == NULL) goto _OVER;
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157 158
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
S
Shengliang Guan 已提交
159

160
  if (InParentProc(pWrapper)) {
161
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
S
Shengliang Guan 已提交
162 163 164
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
165 166

_OVER:
167
  if (code != 0) {
S
Shengliang Guan 已提交
168
    if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
169
    dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
S
Shengliang Guan 已提交
170

S
Shengliang Guan 已提交
171
    if (IsReq(pRpc)) {
172
      SRpcMsg rsp = {.code = code, .info = pRpc->info};
173 174
      if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
          pRpc->msgType < TDMT_VND_MSG) {
175 176 177 178 179
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
      }

      if (pWrapper != NULL) {
        dmSendRsp(&rsp);
S
Shengliang Guan 已提交
180
      } else {
181
        rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
182
      }
S
Shengliang Guan 已提交
183
    }
S
Shengliang Guan 已提交
184

S
Shengliang Guan 已提交
185
    if (pMsg != NULL) {
S
Shengliang Guan 已提交
186
      dGTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
187 188
      taosFreeQitem(pMsg);
    }
S
Shengliang Guan 已提交
189
    rpcFreeCont(pRpc->pCont);
S
Shengliang Guan 已提交
190
    pRpc->pCont = NULL;
S
Shengliang Guan 已提交
191 192
  }

S
Shengliang Guan 已提交
193
  dmReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
194 195
}

S
Shengliang Guan 已提交
196
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
197
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
198

S
Shengliang 已提交
199 200
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
201
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
S
Shengliang Guan 已提交
202 203 204
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
S
Shengliang Guan 已提交
205
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
206
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
207 208 209 210
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
211
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
212
      }
S
Shengliang Guan 已提交
213
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
214
    }
S
Shengliang Guan 已提交
215 216

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
217 218 219 220 221
  }

  return 0;
}

S
Shengliang Guan 已提交
222
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
223 224
  SDnode *pDnode = dmInstance();
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
225 226
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
227
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
228
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
229
    return -1;
230
  } else {
S
Shengliang Guan 已提交
231
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
232
    return 0;
S
shm  
Shengliang Guan 已提交
233 234 235
  }
}

236 237
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
  SMgmtWrapper *pWrapper = pMsg->info.wrapper;
238
  if (InChildProc(pWrapper)) {
239
    dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
240 241
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
242 243 244
  }
}

245 246
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
  SMgmtWrapper *pWrapper = pHandle->wrapper;
247
  if (InChildProc(pWrapper)) {
248
    SRpcMsg msg = {.code = type, .info = *pHandle};
249
    dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
250
  } else {
dengyihao's avatar
dengyihao 已提交
251
    rpcReleaseHandle(pHandle, type);
S
Shengliang Guan 已提交
252
  }
S
shm  
Shengliang Guan 已提交
253 254
}

dengyihao's avatar
dengyihao 已提交
255
static bool rpcRfp(int32_t code, tmsg_t msgType) {
dengyihao's avatar
dengyihao 已提交
256
  if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
D
dapan1121 已提交
257
      code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
D
dapan1121 已提交
258
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
dengyihao's avatar
dengyihao 已提交
259 260
      return false;
    }
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265
    return true;
  } else {
    return false;
  }
}
M
Minghao Li 已提交
266

267
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
268 269 270
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
dengyihao's avatar
dengyihao 已提交
271
  rpcInit.label = "DND-C";
S
Shengliang Guan 已提交
272
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
273
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
274 275 276 277
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
278
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
279 280 281 282 283 284 285 286 287 288 289

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

290
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
291 292 293 294 295 296 297 298
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

299
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
300 301 302
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
303 304
  strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
  rpcInit.localPort = tsServerPort;
dengyihao's avatar
dengyihao 已提交
305
  rpcInit.label = "DND-S";
S
Shengliang Guan 已提交
306
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
307
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

323
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
324 325 326 327 328 329 330 331
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

332 333 334 335 336 337 338 339 340
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .sendRedirectRspFp = dmSendRedirectRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
S
Shengliang Guan 已提交
341 342
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
343
}