dndTransport.c 14.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17 18
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
24 25 26 27
  SMsgHead *pHead = pMsg->pCont;
  int32_t   vgId = htonl(pHead->vgId);

  SMgmtWrapper *pWrapper = pHandle->pWrapper;
S
Shengliang Guan 已提交
28 29 30 31
  if (vgId == QND_VGID) {
    pWrapper = pHandle->pQndWrapper;
  } else if (vgId == MND_VGID) {
    pWrapper = pHandle->pMndWrapper;
32 33 34 35 36 37 38
  }

  dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
         pMsg->handle, pMsg->ahandle, vgId);
  dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}

S
Shengliang Guan 已提交
39
static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
40
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
41
  tmsg_t      msgType = pRsp->msgType;
S
Shengliang Guan 已提交
42

S
shm  
Shengliang Guan 已提交
43
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
44
    dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
45
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
46 47 48
    return;
  }

S
shm  
Shengliang Guan 已提交
49
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
50 51
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
52 53 54 55
      dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
             pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
      dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
    } else {
S
Shengliang Guan 已提交
56
      dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet);
57
    }
S
Shengliang Guan 已提交
58
  } else {
S
shm  
Shengliang Guan 已提交
59
    dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
60
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
61 62 63
  }
}

S
Shengliang Guan 已提交
64
static int32_t dndInitClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
65
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
66 67 68

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
69
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
70
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
71
  rpcInit.cfp = (RpcCfp)dndProcessResponse;
S
Shengliang Guan 已提交
72
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
73
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
74
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
75 76
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
77
  rpcInit.spi = 1;
78
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
79

S
Shengliang Guan 已提交
80 81
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
82 83
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
84 85
  pMgmt->clientRpc = rpcOpen(&rpcInit);
  if (pMgmt->clientRpc == NULL) {
S
Shengliang 已提交
86
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
87 88 89
    return -1;
  }

90
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
91 92 93
  return 0;
}

S
Shengliang Guan 已提交
94
static void dndCleanupClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
95
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
96 97 98
  if (pMgmt->clientRpc) {
    rpcClose(pMgmt->clientRpc);
    pMgmt->clientRpc = NULL;
99
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
100 101 102
  }
}

S
Shengliang Guan 已提交
103
static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
104
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
105
  tmsg_t      msgType = pReq->msgType;
S
Shengliang Guan 已提交
106

H
Hongze Cheng 已提交
107
  if (msgType == TDMT_DND_NETWORK_TEST) {
S
shm  
Shengliang Guan 已提交
108
    dTrace("network test req will be processed, handle:%p, app:%p", pReq->handle, pReq->ahandle);
S
shm  
Shengliang Guan 已提交
109
    dndProcessStartupReq(pDnode, pReq);
S
Shengliang Guan 已提交
110 111 112
    return;
  }

S
shm  
Shengliang Guan 已提交
113
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
114
    dError("req:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
115
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
116
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
117
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
118 119 120
    return;
  }

S
Shengliang Guan 已提交
121
  if (pReq->pCont == NULL) {
S
shm  
Shengliang Guan 已提交
122
    dTrace("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
123
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
124 125 126 127
    rpcSendResponse(&rspMsg);
    return;
  }

S
shm  
Shengliang Guan 已提交
128
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
129 130
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
131 132 133 134
      dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
             pReq->handle, pReq->ahandle);
      dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
    } else {
S
Shengliang Guan 已提交
135
      dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet);
136
    }
S
Shengliang Guan 已提交
137
  } else {
S
shm  
Shengliang Guan 已提交
138
    dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
139
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
140
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
141
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
142 143 144
  }
}

S
Shengliang Guan 已提交
145 146 147 148 149
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
  SEpSet        epSet = {0};
  SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
  dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
  rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
S
Shengliang Guan 已提交
150 151
}

S
Shengliang Guan 已提交
152
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
shm  
Shengliang Guan 已提交
153 154 155
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
156
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
157
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
158
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
159
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
160 161 162 163 164
  } else {
    code = -1;
  }

  if (code == 0) {
165
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
166
    *spi = 1;
S
Shengliang Guan 已提交
167 168 169
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
170 171

  return code;
S
Shengliang Guan 已提交
172 173
}

S
Shengliang Guan 已提交
174 175
static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
176
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
177 178 179
    return 0;
  }

S
Shengliang Guan 已提交
180 181 182
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
183
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
184
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
185

S
Shengliang Guan 已提交
186
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
187
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
188
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
189 190 191 192
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
193
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
194
  } else {
S
Shengliang Guan 已提交
195 196 197 198 199 200 201 202
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
203 204 205 206 207 208
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
209
static int32_t dndInitServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
210
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
211

S
config  
Shengliang Guan 已提交
212
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
S
Shengliang Guan 已提交
213 214 215 216 217 218
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
219
  rpcInit.localPort = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
220
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
221
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
222
  rpcInit.cfp = (RpcCfp)dndProcessRequest;
S
Shengliang Guan 已提交
223
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
224
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
225
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
226
  rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
227
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
228 229 230

  pMgmt->serverRpc = rpcOpen(&rpcInit);
  if (pMgmt->serverRpc == NULL) {
S
Shengliang 已提交
231
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
232 233 234
    return -1;
  }

235
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
236 237 238
  return 0;
}

S
Shengliang Guan 已提交
239
static void dndCleanupServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
240
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
241 242 243
  if (pMgmt->serverRpc) {
    rpcClose(pMgmt->serverRpc);
    pMgmt->serverRpc = NULL;
244
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
245 246 247
  }
}

S
Shengliang Guan 已提交
248 249 250 251 252 253 254 255 256 257 258
int32_t dndInitTrans(SDnode *pDnode) {
  if (dndInitServer(pDnode) != 0) return -1;
  if (dndInitClient(pDnode) != 0) return -1;
  return 0;
}

void dndCleanupTrans(SDnode *pDnode) {
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);
}

S
shm  
Shengliang Guan 已提交
259
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
260
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
261

S
shm  
Shengliang Guan 已提交
262 263
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
264 265

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
266
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
S
Shengliang Guan 已提交
267
      int8_t    vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
268
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
269 270

      SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
S
Shengliang Guan 已提交
271 272 273 274 275 276 277 278 279 280 281 282
      if (vgId == QND_VGID) {
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
      } else if (vgId == MND_VGID) {
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
283
      } else {
S
Shengliang Guan 已提交
284 285 286
        if (pHandle->pWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
287
        }
S
Shengliang Guan 已提交
288
        pHandle->pWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
289 290 291 292 293 294 295
      }
    }
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
296
static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
297 298 299 300 301
  if (pMgmt->clientRpc == NULL) {
    terrno = TSDB_CODE_DND_OFFLINE;
    return -1;
  }

S
Shengliang Guan 已提交
302
  rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
303
  return 0;
S
Shengliang Guan 已提交
304 305
}

S
Shengliang Guan 已提交
306
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
307
  if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
S
Shengliang Guan 已提交
308 309 310
    if (pWrapper->ntype == MNODE) {
      dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
      return;
S
Shengliang Guan 已提交
311
    }
S
shm  
Shengliang Guan 已提交
312
  }
S
Shengliang Guan 已提交
313 314

  rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
315 316
}

S
Shengliang Guan 已提交
317 318
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
319 320 321 322 323 324
    terrno = TSDB_CODE_DND_OFFLINE;
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
325
    return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq);
S
shm  
Shengliang Guan 已提交
326
  } else {
S
Shengliang Guan 已提交
327
    char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
S
shm  
Shengliang Guan 已提交
328 329 330 331
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
S
Shengliang Guan 已提交
332

S
shm  
Shengliang Guan 已提交
333 334
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
335 336
    taosProcPutToParentQ(pWrapper->pProc, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
                         PROC_REQ);
S
shm  
Shengliang Guan 已提交
337 338 339 340 341
    taosMemoryFree(pHead);
    return 0;
  }
}

S
Shengliang Guan 已提交
342
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
343
  if (pWrapper->procType != PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
344
    dndSendRpcRsp(pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
345
  } else {
S
shm  
Shengliang Guan 已提交
346
    taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
S
shm  
Shengliang Guan 已提交
347
  }
D
dapan1121 已提交
348
}
S
Shengliang Guan 已提交
349

S
Shengliang Guan 已提交
350
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
351
  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
352
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
353
  } else {
S
shm  
Shengliang Guan 已提交
354
    taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
S
shm  
Shengliang Guan 已提交
355 356 357
  }
}

S
shm  
Shengliang Guan 已提交
358
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
shm  
Shengliang Guan 已提交
359 360 361 362
  if (pWrapper->procType != PROC_CHILD) {
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
shm  
Shengliang Guan 已提交
363
    taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
S
Shengliang Guan 已提交
364
  }
S
shm  
Shengliang Guan 已提交
365 366 367 368 369
}

SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .pWrapper = pWrapper,
S
shm  
Shengliang Guan 已提交
370 371
      .sendReqFp = dndSendReq,
      .sendRspFp = dndSendRsp,
S
shm  
Shengliang Guan 已提交
372 373 374 375
      .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
      .releaseHandleFp = dndReleaseHandle,
  };
  return msgCb;
S
Shengliang Guan 已提交
376
}
S
Shengliang Guan 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440

static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);

  switch (ftype) {
    case PROC_REGIST:
      rpcRegisterBrokenLinkArg(pMsg);
      break;
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
    case PROC_REQ:
      dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
      break;
    case PROC_RSP:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
    default:
      break;
  }
  taosMemoryFree(pMsg);
}

SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}