dndTransport.c 13.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17 18
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
24 25 26 27
  SMsgHead *pHead = pMsg->pCont;
  int32_t   vgId = htonl(pHead->vgId);

  SMgmtWrapper *pWrapper = pHandle->pWrapper;
S
Shengliang Guan 已提交
28 29 30 31
  if (vgId == QND_VGID) {
    pWrapper = pHandle->pQndWrapper;
  } else if (vgId == MND_VGID) {
    pWrapper = pHandle->pMndWrapper;
32 33 34 35 36 37 38
  }

  dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
         pMsg->handle, pMsg->ahandle, vgId);
  dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}

S
Shengliang Guan 已提交
39
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
40
  SDnode     *pDnode = parent;
S
shm  
Shengliang Guan 已提交
41
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
42
  tmsg_t      msgType = pRsp->msgType;
S
Shengliang Guan 已提交
43

S
shm  
Shengliang Guan 已提交
44
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
45
    dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
46
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
47 48 49
    return;
  }

S
shm  
Shengliang Guan 已提交
50
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
51 52
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
53 54 55 56
      dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
             pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
      dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
    } else {
S
Shengliang Guan 已提交
57
      dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet);
58
    }
S
Shengliang Guan 已提交
59
  } else {
S
shm  
Shengliang Guan 已提交
60
    dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
61
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
62 63 64
  }
}

S
shm  
Shengliang Guan 已提交
65
int32_t dndInitClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
66
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
67 68 69

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
70
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
71 72
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = dndProcessResponse;
S
Shengliang Guan 已提交
73
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
74
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
75
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
76 77
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
78
  rpcInit.spi = 1;
79
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
80

S
Shengliang Guan 已提交
81 82
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
83 84
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
85 86
  pMgmt->clientRpc = rpcOpen(&rpcInit);
  if (pMgmt->clientRpc == NULL) {
S
Shengliang 已提交
87
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
88 89 90
    return -1;
  }

91
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
92 93 94
  return 0;
}

S
shm  
Shengliang Guan 已提交
95
void dndCleanupClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
96
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
97 98 99
  if (pMgmt->clientRpc) {
    rpcClose(pMgmt->clientRpc);
    pMgmt->clientRpc = NULL;
100
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
101 102 103
  }
}

S
Shengliang Guan 已提交
104
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
105
  SDnode     *pDnode = param;
S
shm  
Shengliang Guan 已提交
106
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
107
  tmsg_t      msgType = pReq->msgType;
S
Shengliang Guan 已提交
108

H
Hongze Cheng 已提交
109
  if (msgType == TDMT_DND_NETWORK_TEST) {
S
shm  
Shengliang Guan 已提交
110
    dTrace("network test req will be processed, handle:%p, app:%p", pReq->handle, pReq->ahandle);
S
shm  
Shengliang Guan 已提交
111
    dndProcessStartupReq(pDnode, pReq);
S
Shengliang Guan 已提交
112 113 114
    return;
  }

S
shm  
Shengliang Guan 已提交
115
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
116
    dError("req:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
117
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
118
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
119
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
120 121 122
    return;
  }

S
Shengliang Guan 已提交
123
  if (pReq->pCont == NULL) {
S
shm  
Shengliang Guan 已提交
124
    dTrace("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
125
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
126 127 128 129
    rpcSendResponse(&rspMsg);
    return;
  }

S
shm  
Shengliang Guan 已提交
130
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
131 132
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
133 134 135 136
      dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
             pReq->handle, pReq->ahandle);
      dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
    } else {
S
Shengliang Guan 已提交
137
      dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet);
138
    }
S
Shengliang Guan 已提交
139
  } else {
S
shm  
Shengliang Guan 已提交
140
    dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
141
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
142
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
143
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
144 145 146 147
  }
}

static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
S
shm  
Shengliang Guan 已提交
148
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
149 150 151 152 153 154 155
  SEpSet      epSet = {0};

  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
    dndReleaseWrapper(pWrapper);
  }
S
Shengliang Guan 已提交
156 157 158 159

  rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}

S
shm  
Shengliang Guan 已提交
160 161 162 163
static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
164
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
165
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
166
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
167
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
168 169 170 171 172
  } else {
    code = -1;
  }

  if (code == 0) {
173
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
174
    *spi = 1;
S
Shengliang Guan 已提交
175 176 177
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
178 179

  return code;
S
Shengliang Guan 已提交
180 181
}

S
Shengliang Guan 已提交
182 183 184
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  SDnode *pDnode = parent;

S
shm  
Shengliang Guan 已提交
185
  if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
186
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
187 188 189
    return 0;
  }

S
Shengliang Guan 已提交
190 191 192 193 194 195 196 197
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
  if (pWrapper != NULL) {
    if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
      dndReleaseWrapper(pWrapper);
      dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
      return 0;
    }
    dndReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
198 199 200
  }

  if (terrno != TSDB_CODE_APP_NOT_READY) {
S
Shengliang 已提交
201
    dTrace("failed to get user auth from mnode since %s", terrstr());
S
Shengliang Guan 已提交
202
    return -1;
S
Shengliang Guan 已提交
203 204
  }

S
Shengliang Guan 已提交
205 206 207
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
208
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
209
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
210

S
Shengliang Guan 已提交
211
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
212
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
213
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
214 215 216 217
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
218
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
219
  } else {
S
Shengliang Guan 已提交
220 221 222 223 224 225 226 227
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
228 229 230 231 232 233
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
shm  
Shengliang Guan 已提交
234
int32_t dndInitServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
235
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
236

S
config  
Shengliang Guan 已提交
237
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
S
Shengliang Guan 已提交
238 239 240 241 242 243
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
244
  rpcInit.localPort = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
245
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
246 247
  rpcInit.numOfThreads = numOfThreads;
  rpcInit.cfp = dndProcessRequest;
S
Shengliang Guan 已提交
248
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
249
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
250
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
251
  rpcInit.afp = dndRetrieveUserAuthInfo;
252
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
253 254 255

  pMgmt->serverRpc = rpcOpen(&rpcInit);
  if (pMgmt->serverRpc == NULL) {
S
Shengliang 已提交
256
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
257 258 259
    return -1;
  }

260
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
261 262 263
  return 0;
}

S
shm  
Shengliang Guan 已提交
264
void dndCleanupServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
265
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
266 267 268
  if (pMgmt->serverRpc) {
    rpcClose(pMgmt->serverRpc);
    pMgmt->serverRpc = NULL;
269
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
270 271 272
  }
}

S
shm  
Shengliang Guan 已提交
273
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
274
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
275

S
shm  
Shengliang Guan 已提交
276 277
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
278 279

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
280
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
281
      int32_t   vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
282
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
283

S
xshm  
Shengliang Guan 已提交
284
      // dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
S
Shengliang Guan 已提交
285

S
shm  
Shengliang Guan 已提交
286
      SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
S
Shengliang Guan 已提交
287 288 289 290 291 292 293 294 295 296 297 298
      if (vgId == QND_VGID) {
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
      } else if (vgId == MND_VGID) {
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
299
      } else {
S
Shengliang Guan 已提交
300 301 302
        if (pHandle->pWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
303
        }
S
Shengliang Guan 已提交
304
        pHandle->pWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
305 306 307 308 309 310 311
      }
    }
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
312
static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
313 314 315 316 317
  if (pMgmt->clientRpc == NULL) {
    terrno = TSDB_CODE_DND_OFFLINE;
    return -1;
  }

S
Shengliang Guan 已提交
318
  rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
319
  return 0;
S
Shengliang Guan 已提交
320 321
}

S
shm  
Shengliang Guan 已提交
322
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
S
shm  
Shengliang Guan 已提交
323 324 325
  SDnode     *pDnode = pWrapper->pDnode;
  STransMgmt *pTrans = &pDnode->trans;
  SEpSet      epSet = {0};
S
Shengliang Guan 已提交
326

S
shm  
Shengliang Guan 已提交
327 328 329 330
  SMgmtWrapper *pWrapper2 = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper2 != NULL) {
    dmGetMnodeEpSet(pWrapper2->pMgmt, &epSet);
    dndReleaseWrapper(pWrapper2);
S
shm  
Shengliang Guan 已提交
331
  }
S
shm  
Shengliang Guan 已提交
332
  return dndSendRpcReq(pTrans, &epSet, pReq);
S
shm  
Shengliang Guan 已提交
333 334
}

S
shm  
Shengliang Guan 已提交
335
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
336
  if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
S
shm  
Shengliang Guan 已提交
337
    SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
S
Shengliang Guan 已提交
338 339 340 341 342 343
    if (pDnodeWrapper != NULL) {
      dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
      dndReleaseWrapper(pDnodeWrapper);
    } else {
      rpcSendResponse(pRsp);
    }
S
shm  
Shengliang Guan 已提交
344 345 346 347 348
  } else {
    rpcSendResponse(pRsp);
  }
}

S
shm  
Shengliang Guan 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  SDnode *pDnode = pWrapper->pDnode;
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
    terrno = TSDB_CODE_DND_OFFLINE;
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

  if (pWrapper->procType != PROC_CHILD) {
    return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
  } else {
    int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet);
    char   *pHead = taosMemoryMalloc(headLen);
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));

    taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ);
    taosMemoryFree(pHead);
    return 0;
  }
}

S
shm  
Shengliang Guan 已提交
375
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
376
  if (pWrapper->procType != PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
377
    dndSendRpcRsp(pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
378
  } else {
S
shm  
Shengliang Guan 已提交
379
    taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
S
shm  
Shengliang Guan 已提交
380
  }
D
dapan1121 已提交
381
}
S
Shengliang Guan 已提交
382 383

void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
384
  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
385
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
386
  } else {
S
shm  
Shengliang Guan 已提交
387
    taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
S
shm  
Shengliang Guan 已提交
388 389 390
  }
}

S
shm  
Shengliang Guan 已提交
391
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
shm  
Shengliang Guan 已提交
392 393 394 395
  if (pWrapper->procType != PROC_CHILD) {
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
shm  
Shengliang Guan 已提交
396
    taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
S
Shengliang Guan 已提交
397
  }
S
shm  
Shengliang Guan 已提交
398 399 400 401 402
}

SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .pWrapper = pWrapper,
S
shm  
Shengliang Guan 已提交
403 404
      .sendReqFp = dndSendReq,
      .sendRspFp = dndSendRsp,
S
shm  
Shengliang Guan 已提交
405 406 407 408
      .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
      .releaseHandleFp = dndReleaseHandle,
  };
  return msgCb;
S
Shengliang Guan 已提交
409
}