dmTransport.c 18.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
18 19
#include "qworker.h"

20 21
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
22
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
25 26 27
  taosRLockLatch(&pDnode->latch);
  *pEpSet = pDnode->mnodeEps;
  taosRUnLockLatch(&pDnode->latch);
S
Shengliang Guan 已提交
28 29
}

S
Shengliang Guan 已提交
30
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
31 32
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);

S
Shengliang Guan 已提交
33 34
  taosWLockLatch(&pDnode->latch);
  pDnode->mnodeEps = *pEpSet;
S
Shengliang Guan 已提交
35 36 37 38
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
  }

S
Shengliang Guan 已提交
39
  taosWUnLockLatch(&pDnode->latch);
S
Shengliang Guan 已提交
40
}
41

S
Shengliang Guan 已提交
42
static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
43 44 45
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
46 47
  }

S
Shengliang Guan 已提交
48
  return msgFp;
49 50
}

S
Shengliang Guan 已提交
51
static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60 61 62
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->clientIp = connInfo.clientIp;
  pMsg->clientPort = connInfo.clientPort;
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
63 64 65
  if ((pRpc->msgType & 1u)) {
    assert(pRpc->refId != 0);
  }
S
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67 68 69
  return 0;
}

S
Shengliang Guan 已提交
70
static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
71 72 73
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
  NodeMsgFp msgFp = NULL;
S
Shengliang Guan 已提交
74
  uint16_t  msgType = pRpc->msgType;
75
  bool      needRelease = false;
76
  bool      isReq = msgType & 1U;
S
Shengliang Guan 已提交
77

S
Shengliang Guan 已提交
78
  if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
79
  needRelease = true;
S
Shengliang Guan 已提交
80

S
Shengliang Guan 已提交
81
  if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
S
Shengliang Guan 已提交
82
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
S
Shengliang Guan 已提交
83
  if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
Shengliang Guan 已提交
84

85
  if (pWrapper->procType != DND_PROC_PARENT) {
86
    dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
S
Shengliang Guan 已提交
87
    code = (*msgFp)(pWrapper->pMgmt, pMsg);
88
  } else {
89 90 91 92
    dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg,
           TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen);
    code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
                               (isReq && (pMsg->rpcMsg.code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
S
Shengliang Guan 已提交
93 94 95 96
  }

_OVER:
  if (code == 0) {
S
Shengliang Guan 已提交
97
    if (pWrapper->procType == DND_PROC_PARENT) {
98
      dTrace("msg:%p, freed in parent process", pMsg);
S
Shengliang Guan 已提交
99 100 101 102
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
103 104
    dError("msg:%p, type:%s handle:%p failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), pRpc->handle,
           code & 0XFFFF, terrstr());
105
    if (isReq) {
106
      if (terrno != 0) code = terrno;
107 108 109 110 111 112
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
        if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) {
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }

dengyihao's avatar
dengyihao 已提交
113
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId};
S
Shengliang Guan 已提交
114 115 116 117 118 119 120
      tmsgSendRsp(&rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

121 122 123
  if (needRelease) {
    dmReleaseWrapper(pWrapper);
  }
S
Shengliang Guan 已提交
124 125
}

S
Shengliang Guan 已提交
126
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
127
  SDnodeTrans  *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
128 129
  tmsg_t        msgType = pMsg->msgType;
  bool          isReq = msgType & 1u;
130
  SMsgHandle   *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang 已提交
131
  SMgmtWrapper *pWrapper = NULL;
S
Shengliang Guan 已提交
132

133 134
  switch (msgType) {
    case TDMT_DND_SERVER_STATUS:
135 136 137 138 139 140 141
      if (pDnode->status != DND_STAT_RUNNING) {
        dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
        dmProcessServerStartupStatus(pDnode, pMsg);
        return;
      } else {
        break;
      }
142 143 144 145 146 147 148 149 150 151
    case TDMT_DND_NET_TEST:
      dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
      dmProcessNetTestReq(pDnode, pMsg);
      return;
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
    case TDMT_VND_FETCH_RSP:
      dTrace("retrieve rsp is received");
      qWorkerProcessFetchRsp(NULL, NULL, pMsg);
      pMsg->pCont = NULL;  // already freed in qworker
      return;
152 153
  }

S
Shengliang Guan 已提交
154
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
155 156
    dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
dengyihao's avatar
dengyihao 已提交
157 158
      SRpcMsg rspMsg = {
          .handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
S
Shengliang Guan 已提交
159 160 161
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
162 163 164
    return;
  }

S
Shengliang Guan 已提交
165 166
  if (isReq && pMsg->pCont == NULL) {
    dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
dengyihao's avatar
dengyihao 已提交
167 168
    SRpcMsg rspMsg = {
        .handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
S
Shengliang Guan 已提交
169 170 171 172
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang 已提交
173
  if (pHandle->defaultNtype == NODE_END) {
S
Shengliang Guan 已提交
174 175
    dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
dengyihao's avatar
dengyihao 已提交
176 177
      SRpcMsg rspMsg = {
          .handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
S
Shengliang Guan 已提交
178 179 180
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
181
    return;
S
Shengliang Guan 已提交
182 183
  }

S
Shengliang 已提交
184 185
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
  if (pHandle->needCheckVgId) {
S
Shengliang Guan 已提交
186 187
    SMsgHead *pHead = pMsg->pCont;
    int32_t   vgId = ntohl(pHead->vgId);
S
Shengliang Guan 已提交
188
    if (vgId == QNODE_HANDLE) {
S
Shengliang 已提交
189
      pWrapper = &pDnode->wrappers[QNODE];
S
Shengliang Guan 已提交
190
    } else if (vgId == MNODE_HANDLE) {
S
Shengliang 已提交
191
      pWrapper = &pDnode->wrappers[MNODE];
192 193
    } else {
    }
S
Shengliang Guan 已提交
194
  }
S
Shengliang Guan 已提交
195 196

  dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
dengyihao's avatar
dengyihao 已提交
197 198 199
  if (isReq) {
    assert(pMsg->refId != 0);
  }
S
Shengliang Guan 已提交
200
  dmProcessRpcMsg(pWrapper, pMsg, pEpSet);
S
Shengliang Guan 已提交
201 202
}

S
Shengliang Guan 已提交
203
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
204
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
205

S
Shengliang 已提交
206 207
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
208 209 210 211 212 213 214 215 216 217
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
      SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
      SMsgHandle  *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
218
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
219
      }
S
Shengliang Guan 已提交
220
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
221
    }
S
Shengliang Guan 已提交
222 223

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
224 225 226 227 228
  }

  return 0;
}

S
Shengliang Guan 已提交
229
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
S
Shengliang Guan 已提交
230
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
231
  dmGetMnodeEpSet(pDnode, &epSet);
S
Shengliang Guan 已提交
232 233 234 235

  dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
236
    if (strcmp(epSet.eps[i].fqdn, pDnode->input.localFqdn) == 0 && epSet.eps[i].port == pDnode->input.serverPort) {
S
Shengliang Guan 已提交
237 238 239 240 241
      epSet.inUse = (i + 1) % epSet.numOfEps;
    }

    epSet.eps[i].port = htons(epSet.eps[i].port);
  }
dengyihao's avatar
dengyihao 已提交
242 243 244 245 246 247 248 249 250 251 252
  SRpcMsg resp;
  SMEpSet msg = {.epSet = epSet};
  int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
  resp.pCont = rpcMallocCont(len);
  resp.contLen = len;
  tSerializeSMEpSet(resp.pCont, len, &msg);

  resp.code = TSDB_CODE_RPC_REDIRECT;
  resp.handle = pReq->handle;
  resp.refId = pReq->refId;
  rpcSendResponse(&resp);
S
Shengliang Guan 已提交
253 254
}

S
Shengliang Guan 已提交
255
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
256
  if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
S
Shengliang Guan 已提交
257
    dmSendRpcRedirectRsp(pDnode, pRsp);
258 259
  } else {
    rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
260 261 262
  }
}

S
Shengliang Guan 已提交
263
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
264 265
  if (pDnode->status != DND_STAT_RUNNING) {
    pRsp->code = TSDB_CODE_NODE_OFFLINE;
266 267
    rpcFreeCont(pReq->pCont);
    pReq->pCont = NULL;
S
Shengliang Guan 已提交
268 269 270
  } else {
    rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
  }
S
Shengliang Guan 已提交
271 272
}

S
Shengliang Guan 已提交
273
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
274 275 276
  SEpSet epSet = {0};
  dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
  dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
S
Shengliang Guan 已提交
277 278
}

S
Shengliang Guan 已提交
279
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
280
  SDnode *pDnode = pWrapper->pDnode;
281 282 283
  if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) {
    rpcFreeCont(pReq->pCont);
    pReq->pCont = NULL;
284
    terrno = TSDB_CODE_NODE_OFFLINE;
S
shm  
Shengliang Guan 已提交
285 286 287 288
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

289 290
  rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
  return 0;
S
shm  
Shengliang Guan 已提交
291 292
}

S
Shengliang Guan 已提交
293
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
294
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
295
    dmSendRpcRsp(pWrapper->pDnode, pRsp);
S
shm  
Shengliang Guan 已提交
296
  } else {
S
Shengliang Guan 已提交
297
    taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
S
shm  
Shengliang Guan 已提交
298
  }
D
dapan1121 已提交
299
}
S
Shengliang Guan 已提交
300

M
Minghao Li 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
  ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
  ASSERT(pRsp->pCont == NULL);
  if (pWrapper->procType != DND_PROC_CHILD) {
    SRpcMsg resp = {0};
    SMEpSet msg = {.epSet = *pNewEpSet};
    int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
    resp.pCont = rpcMallocCont(len);
    resp.contLen = len;
    tSerializeSMEpSet(resp.pCont, len, &msg);

    resp.code = TSDB_CODE_RPC_REDIRECT;
    resp.handle = pRsp->handle;
    resp.refId = pRsp->refId;
    rpcSendResponse(&resp);
  } else {
    taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
  }
}

S
Shengliang Guan 已提交
321
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
322
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
323
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
324
  } else {
S
Shengliang Guan 已提交
325
    taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
S
shm  
Shengliang Guan 已提交
326 327 328
  }
}

S
Shengliang Guan 已提交
329
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
Shengliang Guan 已提交
330
  if (pWrapper->procType != DND_PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
331 332 333
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
Shengliang Guan 已提交
334
    taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
S
Shengliang Guan 已提交
335
  }
S
shm  
Shengliang Guan 已提交
336 337
}

S
Shengliang Guan 已提交
338 339
static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                EProcFuncType ftype) {
S
Shengliang Guan 已提交
340 341 342 343 344
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
345
  int32_t   code = (*msgFp)(pWrapper->pMgmt, pMsg);
S
Shengliang Guan 已提交
346 347 348 349

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
dengyihao's avatar
dengyihao 已提交
350
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno, .refId = pRpc->refId};
S
Shengliang Guan 已提交
351
      dmSendRsp(pWrapper, &rsp);
S
Shengliang Guan 已提交
352 353 354 355 356 357 358 359
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
360 361
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 EProcFuncType ftype) {
362
  int32_t code = pMsg->code & 0xFFFF;
S
Shengliang Guan 已提交
363
  pMsg->pCont = pCont;
364 365

  if (ftype == PROC_FUNC_REQ) {
366
    ASSERT(1);
367 368
    dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
           pMsg->handle, code, pMsg->ahandle);
369
    dmSendReq(pWrapper, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
370
  } else if (ftype == PROC_FUNC_RSP) {
S
Shengliang Guan 已提交
371
    dTrace("msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code, pMsg->ahandle);
372 373 374 375 376 377 378 379 380 381 382 383 384 385
    pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
    dmSendRpcRsp(pWrapper->pDnode, pMsg);
  } else if (ftype == PROC_FUNC_REGIST) {
    dTrace("msg:%p, get from parent queue, regist handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
           pMsg->ahandle);
    rpcRegisterBrokenLinkArg(pMsg);
  } else if (ftype == PROC_FUNC_RELEASE) {
    dTrace("msg:%p, get from parent queue, release handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
           pMsg->ahandle);
    taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
    rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
    rpcFreeCont(pCont);
  } else {
    dError("msg:%p, invalid ftype:%d while get from parent queue, handle:%p", pMsg, ftype, pMsg->handle);
S
Shengliang Guan 已提交
386
  }
387

S
Shengliang Guan 已提交
388 389 390
  taosMemoryFree(pMsg);
}

S
Shengliang Guan 已提交
391
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
  SProcCfg cfg = {
      .childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue,
      .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
      .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
      .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
      .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
      .parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue,
      .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
      .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
      .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
      .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
      .shm = pWrapper->procShm,
      .parent = pWrapper,
      .name = pWrapper->name,
  };
S
Shengliang Guan 已提交
407
  return cfg;
408 409
}

410
static bool rpcRfp(int32_t code) {
M
Minghao Li 已提交
411 412 413 414 415 416 417
  if (code == TSDB_CODE_RPC_REDIRECT) {
    return true;
  } else {
    return false;
  }
}

418
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
419 420 421 422 423
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
424
  rpcInit.cfp = (RpcCfp)dmProcessMsg;
S
Shengliang Guan 已提交
425 426 427 428 429 430 431
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
  rpcInit.spi = 1;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
432
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447

  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  rpcInit.secret = pass;

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

448
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
449 450 451 452 453 454 455 456
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

S
Shengliang Guan 已提交
457 458
static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                        char *ckey) {
S
Shengliang Guan 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

  if (strcmp(user, INTERNAL_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
  } else {
    code = -1;
  }

  if (code == 0) {
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
    *spi = 1;
    *encrypt = 0;
    *ckey = 0;
  }

  return code;
}

S
Shengliang Guan 已提交
480 481 482
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                             char *ckey) {
  if (dmGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang Guan 已提交
483 484 485 486 487 488 489
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
    return 0;
  }

  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
490
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
491 492 493 494
  tSerializeSAuthReq(pReq, contLen, &authReq);

  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
495
  SEpSet  epSet = {0};
S
Shengliang Guan 已提交
496
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
497 498
  dmGetMnodeEpSet(pDnode, &epSet);
  dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp);
S
Shengliang Guan 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
  } else {
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

518
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
519 520 521
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
522

S
Shengliang Guan 已提交
523 524
  strncpy(rpcInit.localFqdn, pDnode->input.localFqdn, strlen(pDnode->input.localFqdn));
  rpcInit.localPort = pDnode->input.serverPort;
S
Shengliang Guan 已提交
525 526
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
527
  rpcInit.cfp = (RpcCfp)dmProcessMsg;
S
Shengliang Guan 已提交
528 529 530
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
531
  rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo;
S
Shengliang Guan 已提交
532 533 534 535 536 537 538 539 540 541 542 543
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

544
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
545 546 547 548 549 550 551 552
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

S
Shengliang Guan 已提交
553 554
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
S
Shengliang Guan 已提交
555 556
      .pWrapper = pWrapper,
      .clientRpc = pWrapper->pDnode->trans.clientRpc,
S
Shengliang Guan 已提交
557 558
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
S
Shengliang Guan 已提交
559
      .sendMnodeRecvFp = dmSendToMnodeRecv,
M
Minghao Li 已提交
560
      .sendRedirectRspFp = dmSendRedirectRsp,
S
Shengliang Guan 已提交
561 562
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
S
Shengliang Guan 已提交
563
      .reportStartupFp = dmReportStartupByWrapper,
S
Shengliang Guan 已提交
564 565
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
566
}