dndTransport.c 16.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
  
S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17 18
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23 24 25 26
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
  dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
}
27

S
Shengliang Guan 已提交
28 29 30 31
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
32 33
  }

S
Shengliang Guan 已提交
34
  return msgFp;
35 36
}

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->clientIp = connInfo.clientIp;
  pMsg->clientPort = connInfo.clientPort;
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
  return 0;
}

static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
  NodeMsgFp msgFp = NULL;

  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }

  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;

  if (pWrapper->procType == PROC_SINGLE) {
    dTrace("msg:%p, is created, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
    code = (*msgFp)(pWrapper, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
    dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
71 72
    code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
                               PROC_REQ);
S
Shengliang Guan 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  } else {
    dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
    ASSERT(1);
  }

_OVER:
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
      dTrace("msg:%p, is freed in parent process", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
    dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      tmsgSendRsp(&rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

  dndReleaseWrapper(pWrapper);
}

static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  STransMgmt   *pMgmt = &pDnode->trans;
  tmsg_t        msgType = pMsg->msgType;
  bool          isReq = msgType & 1u;
  SMsgHandle   *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
  SMgmtWrapper *pWrapper = pHandle->pWrapper;

  if (msgType == TDMT_DND_NETWORK_TEST) {
    dTrace("network test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
    dndProcessStartupReq(pDnode, pMsg);
    return;
  }
S
Shengliang Guan 已提交
111

S
shm  
Shengliang Guan 已提交
112
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
113 114 115 116 117 118
    dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
119 120 121
    return;
  }

S
Shengliang Guan 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  if (isReq && pMsg->pCont == NULL) {
    dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
    rpcSendResponse(&rspMsg);
    return;
  }

  if (pWrapper == NULL) {
    dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
  }

  if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
    SMsgHead *pHead = pMsg->pCont;
    int32_t   vgId = ntohl(pHead->vgId);
S
Shengliang Guan 已提交
141
    if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
142
      pWrapper = pHandle->pQndWrapper;
S
Shengliang Guan 已提交
143
    } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
144
      pWrapper = pHandle->pMndWrapper;
145 146
    } else {
    }
S
Shengliang Guan 已提交
147
  }
S
Shengliang Guan 已提交
148 149 150

  dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
  dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
S
Shengliang Guan 已提交
151 152
}

S
Shengliang Guan 已提交
153
static int32_t dndInitClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
154
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
155 156 157

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
158
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
159
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
160
  rpcInit.cfp = (RpcCfp)dndProcessMsg;
S
Shengliang Guan 已提交
161
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
162
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
163
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
164 165
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
166
  rpcInit.spi = 1;
167
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
168

S
Shengliang Guan 已提交
169 170
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
171 172
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
173 174
  pMgmt->clientRpc = rpcOpen(&rpcInit);
  if (pMgmt->clientRpc == NULL) {
S
Shengliang 已提交
175
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
176 177 178
    return -1;
  }

179
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
180 181 182
  return 0;
}

S
Shengliang Guan 已提交
183
static void dndCleanupClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
184
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
185 186 187
  if (pMgmt->clientRpc) {
    rpcClose(pMgmt->clientRpc);
    pMgmt->clientRpc = NULL;
188
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
189 190 191
  }
}

S
Shengliang Guan 已提交
192 193 194 195 196
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
  SEpSet        epSet = {0};
  SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
  dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
  rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
S
Shengliang Guan 已提交
197 198
}

S
Shengliang Guan 已提交
199 200
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                         char *ckey) {
S
shm  
Shengliang Guan 已提交
201 202 203
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
204
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
205
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
206
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
207
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
208 209 210 211 212
  } else {
    code = -1;
  }

  if (code == 0) {
213
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
214
    *spi = 1;
S
Shengliang Guan 已提交
215 216 217
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
218 219

  return code;
S
Shengliang Guan 已提交
220 221
}

S
Shengliang Guan 已提交
222 223
static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
224
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
225 226 227
    return 0;
  }

S
Shengliang Guan 已提交
228 229 230
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
231
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
232
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
235
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
236
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
237 238 239 240
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
241
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
242
  } else {
S
Shengliang Guan 已提交
243 244 245 246 247 248 249 250
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
251 252 253 254 255 256
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
257
static int32_t dndInitServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
258
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
259

S
config  
Shengliang Guan 已提交
260
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
S
Shengliang Guan 已提交
261 262 263 264 265 266
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
267
  rpcInit.localPort = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
268
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
269
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
270
  rpcInit.cfp = (RpcCfp)dndProcessMsg;
S
Shengliang Guan 已提交
271
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
272
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
273
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
274
  rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
275
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
276 277 278

  pMgmt->serverRpc = rpcOpen(&rpcInit);
  if (pMgmt->serverRpc == NULL) {
S
Shengliang 已提交
279
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
280 281 282
    return -1;
  }

283
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
284 285 286
  return 0;
}

S
Shengliang Guan 已提交
287
static void dndCleanupServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
288
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
289 290 291
  if (pMgmt->serverRpc) {
    rpcClose(pMgmt->serverRpc);
    pMgmt->serverRpc = NULL;
292
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
293 294 295
  }
}

S
Shengliang Guan 已提交
296 297 298 299 300 301 302 303 304 305 306
int32_t dndInitTrans(SDnode *pDnode) {
  if (dndInitServer(pDnode) != 0) return -1;
  if (dndInitClient(pDnode) != 0) return -1;
  return 0;
}

void dndCleanupTrans(SDnode *pDnode) {
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);
}

S
shm  
Shengliang Guan 已提交
307
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
308
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
309

S
Shengliang Guan 已提交
310
  for (EDndType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
311
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
312 313

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
314
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
S
Shengliang Guan 已提交
315
      int8_t    vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
316
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
317 318

      SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
S
Shengliang Guan 已提交
319
      if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
320 321 322 323 324
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
S
Shengliang Guan 已提交
325
      } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
326 327 328 329 330
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
331
      } else {
S
Shengliang Guan 已提交
332 333 334
        if (pHandle->pWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
335
        }
S
Shengliang Guan 已提交
336
        pHandle->pWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
337 338 339 340 341 342 343
      }
    }
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
344
static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
345 346 347 348 349
  if (pMgmt->clientRpc == NULL) {
    terrno = TSDB_CODE_DND_OFFLINE;
    return -1;
  }

S
Shengliang Guan 已提交
350
  rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
351
  return 0;
S
Shengliang Guan 已提交
352 353
}

S
Shengliang Guan 已提交
354
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
355
  if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
S
Shengliang Guan 已提交
356 357 358
    if (pWrapper->ntype == MNODE) {
      dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
      return;
S
Shengliang Guan 已提交
359
    }
S
shm  
Shengliang Guan 已提交
360
  }
S
Shengliang Guan 已提交
361 362

  rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
363 364
}

S
Shengliang Guan 已提交
365 366
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
367 368 369 370 371 372
    terrno = TSDB_CODE_DND_OFFLINE;
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
373
    return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq);
S
shm  
Shengliang Guan 已提交
374
  } else {
S
Shengliang Guan 已提交
375
    char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
S
shm  
Shengliang Guan 已提交
376 377 378 379
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
S
Shengliang Guan 已提交
380

S
shm  
Shengliang Guan 已提交
381 382
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
383 384
    taosProcPutToParentQ(pWrapper->pProc, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
                         PROC_REQ);
S
shm  
Shengliang Guan 已提交
385 386 387 388 389
    taosMemoryFree(pHead);
    return 0;
  }
}

S
Shengliang Guan 已提交
390
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
391
  if (pWrapper->procType != PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
392
    dndSendRpcRsp(pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
393
  } else {
S
shm  
Shengliang Guan 已提交
394
    taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
S
shm  
Shengliang Guan 已提交
395
  }
D
dapan1121 已提交
396
}
S
Shengliang Guan 已提交
397

S
Shengliang Guan 已提交
398
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
399
  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
400
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
401
  } else {
S
shm  
Shengliang Guan 已提交
402
    taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
S
shm  
Shengliang Guan 已提交
403 404 405
  }
}

S
shm  
Shengliang Guan 已提交
406
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
shm  
Shengliang Guan 已提交
407 408 409 410
  if (pWrapper->procType != PROC_CHILD) {
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
shm  
Shengliang Guan 已提交
411
    taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
S
Shengliang Guan 已提交
412
  }
S
shm  
Shengliang Guan 已提交
413 414 415 416 417
}

SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .pWrapper = pWrapper,
S
shm  
Shengliang Guan 已提交
418 419
      .sendReqFp = dndSendReq,
      .sendRspFp = dndSendRsp,
S
shm  
Shengliang Guan 已提交
420 421 422 423
      .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
      .releaseHandleFp = dndReleaseHandle,
  };
  return msgCb;
S
Shengliang Guan 已提交
424
}
S
Shengliang Guan 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457

static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);

  switch (ftype) {
    case PROC_REGIST:
      rpcRegisterBrokenLinkArg(pMsg);
      break;
    case PROC_RELEASE:
458
      taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
S
Shengliang Guan 已提交
459 460 461 462 463 464 465
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
    case PROC_REQ:
      dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
      break;
    case PROC_RSP:
466
      taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
S
Shengliang Guan 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
      dndSendRpcRsp(pWrapper, pMsg);
      break;
    default:
      break;
  }
  taosMemoryFree(pMsg);
}

SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
487
                  .parent = pWrapper,
S
Shengliang Guan 已提交
488 489 490
                  .name = pWrapper->name};
  return cfg;
}