dndTransport.c 16.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dndNode.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
24
  SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN];
S
Shengliang Guan 已提交
25 26
  dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
}
27

S
Shengliang Guan 已提交
28 29 30 31
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
32 33
  }

S
Shengliang Guan 已提交
34
  return msgFp;
35 36
}

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->clientIp = connInfo.clientIp;
  pMsg->clientPort = connInfo.clientPort;
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
  return 0;
}

static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
  NodeMsgFp msgFp = NULL;
S
Shengliang Guan 已提交
56
  uint16_t  msgType = pRpc->msgType;
S
Shengliang Guan 已提交
57

58
  if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
S
Shengliang Guan 已提交
59 60 61 62 63 64 65 66
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }

  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;

S
Shengliang Guan 已提交
67
  if (pWrapper->procType == DND_PROC_SINGLE) {
S
Shengliang Guan 已提交
68 69
    dTrace("msg:%p, is created, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
    code = (*msgFp)(pWrapper, pMsg);
S
Shengliang Guan 已提交
70
  } else if (pWrapper->procType == DND_PROC_PARENT) {
S
Shengliang Guan 已提交
71
    dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
S
Shengliang Guan 已提交
72
    code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
73
                               PROC_REQ);
S
Shengliang Guan 已提交
74 75 76 77 78 79 80
  } else {
    dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
    ASSERT(1);
  }

_OVER:
  if (code == 0) {
S
Shengliang Guan 已提交
81
    if (pWrapper->procType == DND_PROC_PARENT) {
S
Shengliang Guan 已提交
82 83 84 85 86 87
      dTrace("msg:%p, is freed in parent process", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
    dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
88
    if (msgType & 1U) {
89
      if (terrno != 0) code = terrno;
90 91 92 93 94 95 96
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
        if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) {
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }

      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107
      tmsgSendRsp(&rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

  dndReleaseWrapper(pWrapper);
}

static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
108
  SDnodeTrans  *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
109 110
  tmsg_t        msgType = pMsg->msgType;
  bool          isReq = msgType & 1u;
S
Shengliang Guan 已提交
111 112
  SMsgHandle   *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
  SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
S
Shengliang Guan 已提交
113 114 115 116 117 118

  if (msgType == TDMT_DND_NETWORK_TEST) {
    dTrace("network test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
    dndProcessStartupReq(pDnode, pMsg);
    return;
  }
S
Shengliang Guan 已提交
119

S
shm  
Shengliang Guan 已提交
120
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
121 122 123 124 125 126
    dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
127 128 129
    return;
  }

S
Shengliang Guan 已提交
130 131
  if (isReq && pMsg->pCont == NULL) {
    dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
132
    SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    rpcSendResponse(&rspMsg);
    return;
  }

  if (pWrapper == NULL) {
    dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
    if (isReq) {
      SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
      rpcSendResponse(&rspMsg);
    }
    rpcFreeCont(pMsg->pCont);
  }

  if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
    SMsgHead *pHead = pMsg->pCont;
    int32_t   vgId = ntohl(pHead->vgId);
S
Shengliang Guan 已提交
149
    if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
150
      pWrapper = pHandle->pQndWrapper;
S
Shengliang Guan 已提交
151
    } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
152
      pWrapper = pHandle->pMndWrapper;
153 154
    } else {
    }
S
Shengliang Guan 已提交
155
  }
S
Shengliang Guan 已提交
156 157 158

  dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
  dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
S
Shengliang Guan 已提交
159 160
}

S
Shengliang Guan 已提交
161
static int32_t dndInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
162
  SDnodeTrans *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
163 164 165

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
166
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
167
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
168
  rpcInit.cfp = (RpcCfp)dndProcessMsg;
S
Shengliang Guan 已提交
169
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
170
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
171
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
172 173
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
174
  rpcInit.spi = 1;
175
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
176

S
Shengliang Guan 已提交
177 178
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
179 180
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
181 182
  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
S
Shengliang 已提交
183
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
184 185 186
    return -1;
  }

187
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
188 189 190
  return 0;
}

S
Shengliang Guan 已提交
191
static void dndCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
192 193 194 195
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
196
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
197 198 199
  }
}

S
Shengliang Guan 已提交
200 201
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
  SEpSet        epSet = {0};
S
Shengliang Guan 已提交
202
  SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN];
S
Shengliang Guan 已提交
203 204
  dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
  rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
S
Shengliang Guan 已提交
205 206
}

S
Shengliang Guan 已提交
207 208
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                         char *ckey) {
S
shm  
Shengliang Guan 已提交
209 210 211
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
212
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
213
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
214
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
215
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
216 217 218 219 220
  } else {
    code = -1;
  }

  if (code == 0) {
221
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
222
    *spi = 1;
S
Shengliang Guan 已提交
223 224 225
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
226 227

  return code;
S
Shengliang Guan 已提交
228 229
}

S
Shengliang Guan 已提交
230 231
static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
232
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
233 234 235
    return 0;
  }

S
Shengliang Guan 已提交
236 237 238
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
239
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
240
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
241

S
Shengliang Guan 已提交
242
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
243
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
244
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
245 246 247 248
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
249
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
250
  } else {
S
Shengliang Guan 已提交
251 252 253 254 255 256 257 258
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
259 260 261 262 263 264
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
265
static int32_t dndInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
266
  SDnodeTrans *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
267 268 269

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
270
  rpcInit.localPort = pDnode->data.serverPort;
S
shm  
Shengliang Guan 已提交
271
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
272
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
273
  rpcInit.cfp = (RpcCfp)dndProcessMsg;
S
Shengliang Guan 已提交
274
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
275
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
276
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
277
  rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
278
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
279

S
Shengliang Guan 已提交
280 281
  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
S
Shengliang 已提交
282
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
283 284 285
    return -1;
  }

286
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
287 288 289
  return 0;
}

S
Shengliang Guan 已提交
290
static void dndCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
291 292 293 294
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
295
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
296 297 298
  }
}

S
Shengliang Guan 已提交
299 300 301 302 303 304 305 306 307 308 309
int32_t dndInitTrans(SDnode *pDnode) {
  if (dndInitServer(pDnode) != 0) return -1;
  if (dndInitClient(pDnode) != 0) return -1;
  return 0;
}

void dndCleanupTrans(SDnode *pDnode) {
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);
}

S
shm  
Shengliang Guan 已提交
310
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
311
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
312

S
Shengliang Guan 已提交
313
  for (EDndNodeType n = 0; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
314
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
315 316

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
317
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
S
Shengliang Guan 已提交
318
      int8_t    vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
319
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
320

S
Shengliang Guan 已提交
321
      SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
S
Shengliang Guan 已提交
322
      if (vgId == QNODE_HANDLE) {
S
Shengliang Guan 已提交
323 324 325 326 327
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
S
Shengliang Guan 已提交
328
      } else if (vgId == MNODE_HANDLE) {
S
Shengliang Guan 已提交
329 330 331 332 333
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
334
      } else {
S
Shengliang Guan 已提交
335
        if (pHandle->pNdWrapper != NULL) {
S
Shengliang Guan 已提交
336 337
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
338
        }
S
Shengliang Guan 已提交
339
        pHandle->pNdWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
340 341 342 343 344 345 346
      }
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
347 348
static int32_t dndSendRpcReq(SDnodeTrans *pTrans, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (pTrans->clientRpc == NULL) {
349
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
350 351 352
    return -1;
  }

S
Shengliang Guan 已提交
353
  rpcSendRequest(pTrans->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
354
  return 0;
S
Shengliang Guan 已提交
355 356
}

S
Shengliang Guan 已提交
357
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
358
  if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
359 360 361
    dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
  } else {
    rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
362 363 364
  }
}

S
Shengliang Guan 已提交
365 366
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) {
367
    terrno = TSDB_CODE_NODE_OFFLINE;
S
shm  
Shengliang Guan 已提交
368 369 370 371
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

S
Shengliang Guan 已提交
372
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
373
    return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq);
S
shm  
Shengliang Guan 已提交
374
  } else {
S
Shengliang Guan 已提交
375
    char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
S
shm  
Shengliang Guan 已提交
376 377 378 379
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
S
Shengliang Guan 已提交
380

S
shm  
Shengliang Guan 已提交
381 382
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
383
    taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
S
Shengliang Guan 已提交
384
                         PROC_REQ);
S
shm  
Shengliang Guan 已提交
385 386 387 388 389
    taosMemoryFree(pHead);
    return 0;
  }
}

S
Shengliang Guan 已提交
390
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
391
  if (pWrapper->procType != DND_PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
392
    dndSendRpcRsp(pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
393
  } else {
S
Shengliang Guan 已提交
394
    taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
S
shm  
Shengliang Guan 已提交
395
  }
D
dapan1121 已提交
396
}
S
Shengliang Guan 已提交
397

S
Shengliang Guan 已提交
398
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
399
  if (pWrapper->procType != DND_PROC_CHILD) {
S
Shengliang Guan 已提交
400
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
401
  } else {
S
Shengliang Guan 已提交
402
    taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
S
shm  
Shengliang Guan 已提交
403 404 405
  }
}

S
shm  
Shengliang Guan 已提交
406
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
Shengliang Guan 已提交
407
  if (pWrapper->procType != DND_PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
408 409 410
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
Shengliang Guan 已提交
411
    taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
S
Shengliang Guan 已提交
412
  }
S
shm  
Shengliang Guan 已提交
413 414 415 416 417
}

SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .pWrapper = pWrapper,
S
shm  
Shengliang Guan 已提交
418 419
      .sendReqFp = dndSendReq,
      .sendRspFp = dndSendRsp,
S
shm  
Shengliang Guan 已提交
420 421 422 423
      .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
      .releaseHandleFp = dndReleaseHandle,
  };
  return msgCb;
S
Shengliang Guan 已提交
424
}
S
Shengliang Guan 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450

static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
451 452
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
         pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
S
Shengliang Guan 已提交
453 454 455 456 457 458

  switch (ftype) {
    case PROC_REGIST:
      rpcRegisterBrokenLinkArg(pMsg);
      break;
    case PROC_RELEASE:
S
Shengliang Guan 已提交
459
      taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
S
Shengliang Guan 已提交
460 461 462 463 464 465 466
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
    case PROC_REQ:
      dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
      break;
    case PROC_RSP:
S
Shengliang Guan 已提交
467
      taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
S
Shengliang Guan 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
      dndSendRpcRsp(pWrapper, pMsg);
      break;
    default:
      break;
  }
  taosMemoryFree(pMsg);
}

SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
S
Shengliang Guan 已提交
487
                  .shm = pWrapper->procShm,
488
                  .parent = pWrapper,
S
Shengliang Guan 已提交
489 490
                  .name = pWrapper->name};
  return cfg;
491 492 493 494
}

void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
  rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
S
Shengliang Guan 已提交
495
}