dmTransport.c 17.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
24
  taosRLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
25
  *pEpSet = pDnode->data.mnodeEps;
S
Shengliang Guan 已提交
26 27 28
  taosRUnLockLatch(&pDnode->data.latch);
}

S
Shengliang Guan 已提交
29
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
30 31 32
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);

  taosWLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
33
  pDnode->data.mnodeEps = *pEpSet;
S
Shengliang Guan 已提交
34 35 36 37 38
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
  }

  taosWUnLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
39
}
40

S
Shengliang Guan 已提交
41
static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
42 43 44
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
45 46
  }

S
Shengliang Guan 已提交
47
  return msgFp;
48 49
}

S
Shengliang Guan 已提交
50
static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->clientIp = connInfo.clientIp;
  pMsg->clientPort = connInfo.clientPort;
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
  return 0;
}

S
Shengliang Guan 已提交
65
static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
66 67 68
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
  NodeMsgFp msgFp = NULL;
S
Shengliang Guan 已提交
69
  uint16_t  msgType = pRpc->msgType;
S
Shengliang Guan 已提交
70

71
  if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
S
Shengliang Guan 已提交
72
    dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
S
Shengliang Guan 已提交
73 74
  }

S
Shengliang Guan 已提交
75 76
  if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
S
Shengliang Guan 已提交
77
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
S
Shengliang Guan 已提交
78
  if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
Shengliang Guan 已提交
79

S
Shengliang Guan 已提交
80
  if (pWrapper->procType == DND_PROC_SINGLE) {
S
Shengliang Guan 已提交
81
    dTrace("msg:%p, is created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
S
Shengliang Guan 已提交
82
    code = (*msgFp)(pWrapper, pMsg);
S
Shengliang Guan 已提交
83
  } else if (pWrapper->procType == DND_PROC_PARENT) {
S
Shengliang Guan 已提交
84 85
    dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
           pRpc->handle, pMsg->user);
S
Shengliang Guan 已提交
86
    code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
S
Shengliang Guan 已提交
87
                               PROC_FUNC_REQ);
S
Shengliang Guan 已提交
88 89 90 91 92 93 94
  } else {
    dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
    ASSERT(1);
  }

_OVER:
  if (code == 0) {
S
Shengliang Guan 已提交
95
    if (pWrapper->procType == DND_PROC_PARENT) {
S
Shengliang Guan 已提交
96 97 98 99 100
      dTrace("msg:%p, is freed in parent process", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
Shengliang Guan 已提交
101
    dError("msg:%p, type:%s failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), code & 0XFFFF, terrstr());
102
    if (msgType & 1U) {
103
      if (terrno != 0) code = terrno;
104 105 106 107 108 109 110
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
        if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) {
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }

      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
Shengliang Guan 已提交
111 112 113 114 115 116 117
      tmsgSendRsp(&rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

S
Shengliang Guan 已提交
118
  dmReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
119 120
}

S
Shengliang Guan 已提交
121
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
122
  SDnodeTrans * pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
123 124
  tmsg_t        msgType = pMsg->msgType;
  bool          isReq = msgType & 1u;
dengyihao's avatar
dengyihao 已提交
125
  SMsgHandle *  pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
126
  SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
S
Shengliang Guan 已提交
127

S
Shengliang Guan 已提交
128 129 130
  if (msgType == TDMT_DND_SERVER_STATUS) {
    dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
    dmProcessServerStatusReq(pDnode, pMsg);
S
Shengliang Guan 已提交
131 132
    return;
  }
S
Shengliang Guan 已提交
133

S
Shengliang Guan 已提交
134
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
135 136 137 138 139 140
    dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
141 142 143
    return;
  }

S
Shengliang Guan 已提交
144 145
  if (isReq && pMsg->pCont == NULL) {
    dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
146
    SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
S
Shengliang Guan 已提交
147 148 149 150 151 152 153 154 155 156 157
    rpcSendResponse(&rspMsg);
    return;
  }

  if (pWrapper == NULL) {
    dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
158
    return;
S
Shengliang Guan 已提交
159 160 161 162 163
  }

  if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
    SMsgHead *pHead = pMsg->pCont;
    int32_t   vgId = ntohl(pHead->vgId);
S
Shengliang Guan 已提交
164
    if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
165
      pWrapper = pHandle->pQndWrapper;
S
Shengliang Guan 已提交
166
    } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
167
      pWrapper = pHandle->pMndWrapper;
168 169
    } else {
    }
S
Shengliang Guan 已提交
170
  }
S
Shengliang Guan 已提交
171 172

  dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
S
Shengliang Guan 已提交
173
  dmProcessRpcMsg(pWrapper, pMsg, pEpSet);
S
Shengliang Guan 已提交
174 175
}

S
Shengliang Guan 已提交
176
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
177
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
178

S
Shengliang Guan 已提交
179
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
180
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
181 182

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
183
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
S
Shengliang Guan 已提交
184
      int8_t    vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
185
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
186

S
Shengliang Guan 已提交
187
      SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
S
Shengliang Guan 已提交
188
      if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
189 190 191 192 193
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
S
Shengliang Guan 已提交
194
      } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
195 196 197 198 199
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
200
      } else {
S
Shengliang Guan 已提交
201
        if (pHandle->pNdWrapper != NULL) {
S
Shengliang Guan 已提交
202 203
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
204
        }
S
Shengliang Guan 已提交
205
        pHandle->pNdWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
206 207 208 209 210 211 212
      }
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
213
static inline int32_t dmSendRpcReq(SDnode *pDnode, const SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
214
  if (pDnode->trans.clientRpc == NULL) {
215
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
216 217 218
    return -1;
  }

S
Shengliang Guan 已提交
219
  rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
220
  return 0;
S
Shengliang Guan 已提交
221 222
}

S
Shengliang Guan 已提交
223
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
S
Shengliang Guan 已提交
224
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
225
  dmGetMnodeEpSet(pDnode, &epSet);
S
Shengliang Guan 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239

  dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
    if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
      epSet.inUse = (i + 1) % epSet.numOfEps;
    }

    epSet.eps[i].port = htons(epSet.eps[i].port);
  }

  rpcSendRedirectRsp(pReq->handle, &epSet);
}

S
Shengliang Guan 已提交
240
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
241
  if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
S
Shengliang Guan 已提交
242
    dmSendRpcRedirectRsp(pDnode, pRsp);
243 244
  } else {
    rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
245 246 247
  }
}

S
Shengliang Guan 已提交
248
void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
249 250 251
  rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}

S
Shengliang Guan 已提交
252
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
253
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
254
  dmGetMnodeEpSet(pDnode, &epSet);
S
Shengliang Guan 已提交
255 256 257
  rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
}

S
Shengliang Guan 已提交
258 259
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (pWrapper->pDnode->status != DND_STAT_RUNNING) {
260
    terrno = TSDB_CODE_NODE_OFFLINE;
S
shm  
Shengliang Guan 已提交
261 262 263 264
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

S
Shengliang Guan 已提交
265
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
266
    return dmSendRpcReq(pWrapper->pDnode, pEpSet, pReq);
S
shm  
Shengliang Guan 已提交
267
  } else {
S
Shengliang Guan 已提交
268
    char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
S
shm  
Shengliang Guan 已提交
269 270 271 272
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
S
Shengliang Guan 已提交
273

S
shm  
Shengliang Guan 已提交
274 275
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
276
    taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
S
Shengliang Guan 已提交
277
                         PROC_FUNC_REQ);
S
shm  
Shengliang Guan 已提交
278 279 280 281 282
    taosMemoryFree(pHead);
    return 0;
  }
}

S
Shengliang Guan 已提交
283
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
284
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
285
    dmSendRpcRsp(pWrapper->pDnode, pRsp);
S
shm  
Shengliang Guan 已提交
286
  } else {
S
Shengliang Guan 已提交
287
    taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
S
shm  
Shengliang Guan 已提交
288
  }
D
dapan1121 已提交
289
}
S
Shengliang Guan 已提交
290

S
Shengliang Guan 已提交
291
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
292
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
293
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
294
  } else {
S
Shengliang Guan 已提交
295
    taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
S
shm  
Shengliang Guan 已提交
296 297 298
  }
}

S
Shengliang Guan 已提交
299
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
Shengliang Guan 已提交
300
  if (pWrapper->procType != DND_PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
301 302 303
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
Shengliang Guan 已提交
304
    taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
S
Shengliang Guan 已提交
305
  }
S
shm  
Shengliang Guan 已提交
306 307
}

S
Shengliang Guan 已提交
308 309
static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                EProcFuncType ftype) {
S
Shengliang Guan 已提交
310 311 312 313 314 315 316 317 318 319 320
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
S
Shengliang Guan 已提交
321
      dmSendRsp(pWrapper, &rsp);
S
Shengliang Guan 已提交
322 323 324 325 326 327 328 329
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
330 331
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 EProcFuncType ftype) {
S
Shengliang Guan 已提交
332
  pMsg->pCont = pCont;
333 334
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
         pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
S
Shengliang Guan 已提交
335 336

  switch (ftype) {
S
Shengliang Guan 已提交
337
    case PROC_FUNC_REGIST:
S
Shengliang Guan 已提交
338 339
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
Shengliang Guan 已提交
340
    case PROC_FUNC_RELEASE:
S
Shengliang Guan 已提交
341
      taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
S
Shengliang Guan 已提交
342 343 344
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
Shengliang Guan 已提交
345
    case PROC_FUNC_REQ:
S
Shengliang Guan 已提交
346
      dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
S
Shengliang Guan 已提交
347
      break;
S
Shengliang Guan 已提交
348
    case PROC_FUNC_RSP:
S
Shengliang Guan 已提交
349
      taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
S
Shengliang Guan 已提交
350
      dmSendRpcRsp(pWrapper->pDnode, pMsg);
S
Shengliang Guan 已提交
351 352 353 354 355 356 357
      break;
    default:
      break;
  }
  taosMemoryFree(pMsg);
}

S
Shengliang Guan 已提交
358 359
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue,
S
Shengliang Guan 已提交
360 361 362 363
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
S
Shengliang Guan 已提交
364
                  .parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue,
S
Shengliang Guan 已提交
365 366 367 368
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
S
Shengliang Guan 已提交
369
                  .shm = pWrapper->procShm,
370
                  .parent = pWrapper,
S
Shengliang Guan 已提交
371 372
                  .name = pWrapper->name};
  return cfg;
373 374
}

S
Shengliang Guan 已提交
375
static int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
376 377 378 379 380
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
381
  rpcInit.cfp = (RpcCfp)dmProcessMsg;
S
Shengliang Guan 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
  rpcInit.spi = 1;
  rpcInit.parent = pDnode;

  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  rpcInit.secret = pass;

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
404
static void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
405 406 407 408 409 410 411 412
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

S
Shengliang Guan 已提交
413 414
static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                        char *ckey) {
S
Shengliang Guan 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

  if (strcmp(user, INTERNAL_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
  } else {
    code = -1;
  }

  if (code == 0) {
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
    *spi = 1;
    *encrypt = 0;
    *ckey = 0;
  }

  return code;
}

S
Shengliang Guan 已提交
436 437 438
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                             char *ckey) {
  if (dmGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang Guan 已提交
439 440 441 442 443 444 445
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
    return 0;
  }

  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
dengyihao's avatar
dengyihao 已提交
446
  void *  pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
447 448 449 450 451
  tSerializeSAuthReq(pReq, contLen, &authReq);

  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
  SRpcMsg rpcRsp = {0};
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
452
  dmSendToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
S
Shengliang Guan 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
  } else {
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
472
static int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
473 474 475 476 477 478
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.localPort = pDnode->data.serverPort;
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
479
  rpcInit.cfp = (RpcCfp)dmProcessMsg;
S
Shengliang Guan 已提交
480 481 482
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
483
  rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo;
S
Shengliang Guan 已提交
484 485 486 487 488 489 490 491 492 493 494 495
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
496
static void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
497 498 499 500 501 502 503 504
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

S
Shengliang Guan 已提交
505
int32_t dmInitTrans(SDnode *pDnode) {
S
Shengliang Guan 已提交
506 507
  if (dmInitServer(pDnode) != 0) return -1;
  if (dmInitClient(pDnode) != 0) return -1;
S
Shengliang Guan 已提交
508 509 510
  return 0;
}

S
Shengliang Guan 已提交
511 512 513
void dmCleanupTrans(SDnode *pDnode) {
  dmCleanupServer(pDnode);
  dmCleanupClient(pDnode);
S
Shengliang Guan 已提交
514
}
S
Shengliang Guan 已提交
515 516 517 518 519 520 521

SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
S
Shengliang Guan 已提交
522
      .reportStartupFp = dmReportStartupByWrapper,
S
Shengliang Guan 已提交
523 524 525
      .pWrapper = pWrapper,
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
526
}