dndTransport.c 15.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17 18
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
24 25 26 27
  SMsgHead *pHead = pMsg->pCont;
  int32_t   vgId = htonl(pHead->vgId);

  SMgmtWrapper *pWrapper = pHandle->pWrapper;
S
Shengliang Guan 已提交
28 29 30 31
  if (vgId == QND_VGID) {
    pWrapper = pHandle->pQndWrapper;
  } else if (vgId == MND_VGID) {
    pWrapper = pHandle->pMndWrapper;
32 33 34 35 36 37 38
  }

  dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
         pMsg->handle, pMsg->ahandle, vgId);
  dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}

S
Shengliang Guan 已提交
39
static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
40
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
41
  tmsg_t      msgType = pRsp->msgType;
S
Shengliang Guan 已提交
42

S
shm  
Shengliang Guan 已提交
43
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
44
    dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
45
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
46 47 48
    return;
  }

S
shm  
Shengliang Guan 已提交
49
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
50 51
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
52 53 54 55
      dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
             pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
      dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
    } else {
S
Shengliang Guan 已提交
56
      dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet);
57
    }
S
Shengliang Guan 已提交
58
  } else {
S
shm  
Shengliang Guan 已提交
59
    dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
S
Shengliang Guan 已提交
60
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
61 62 63
  }
}

S
Shengliang Guan 已提交
64
static int32_t dndInitClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
65
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
66 67 68

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
69
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
70
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
71
  rpcInit.cfp = (RpcCfp)dndProcessResponse;
S
Shengliang Guan 已提交
72
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
73
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
74
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
75 76
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
77
  rpcInit.spi = 1;
78
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
79

S
Shengliang Guan 已提交
80 81
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
82 83
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
84 85
  pMgmt->clientRpc = rpcOpen(&rpcInit);
  if (pMgmt->clientRpc == NULL) {
S
Shengliang 已提交
86
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
87 88 89
    return -1;
  }

90
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
91 92 93
  return 0;
}

S
Shengliang Guan 已提交
94
static void dndCleanupClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
95
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
96 97 98
  if (pMgmt->clientRpc) {
    rpcClose(pMgmt->clientRpc);
    pMgmt->clientRpc = NULL;
99
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
100 101 102
  }
}

S
Shengliang Guan 已提交
103
static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
104
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
105
  tmsg_t      msgType = pReq->msgType;
S
Shengliang Guan 已提交
106

H
Hongze Cheng 已提交
107
  if (msgType == TDMT_DND_NETWORK_TEST) {
S
shm  
Shengliang Guan 已提交
108
    dTrace("network test req will be processed, handle:%p, app:%p", pReq->handle, pReq->ahandle);
S
shm  
Shengliang Guan 已提交
109
    dndProcessStartupReq(pDnode, pReq);
S
Shengliang Guan 已提交
110 111 112
    return;
  }

S
shm  
Shengliang Guan 已提交
113
  if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
114
    dError("req:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
115
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
116
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
117
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
118 119 120
    return;
  }

S
Shengliang Guan 已提交
121
  if (pReq->pCont == NULL) {
S
shm  
Shengliang Guan 已提交
122
    dTrace("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
123
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
124 125 126 127
    rpcSendResponse(&rspMsg);
    return;
  }

S
shm  
Shengliang Guan 已提交
128
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
129 130
  if (pHandle->pWrapper != NULL) {
    if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
131 132 133 134
      dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
             pReq->handle, pReq->ahandle);
      dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
    } else {
S
Shengliang Guan 已提交
135
      dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet);
136
    }
S
Shengliang Guan 已提交
137
  } else {
S
shm  
Shengliang Guan 已提交
138
    dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
S
Shengliang 已提交
139
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
140
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
141
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
142 143 144 145
  }
}

static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
S
shm  
Shengliang Guan 已提交
146
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
147 148 149 150 151 152 153
  SEpSet      epSet = {0};

  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
    dndReleaseWrapper(pWrapper);
  }
S
Shengliang Guan 已提交
154 155 156 157

  rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}

S
shm  
Shengliang Guan 已提交
158 159 160 161
static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
162
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
163
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
164
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
165
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
166 167 168 169 170
  } else {
    code = -1;
  }

  if (code == 0) {
171
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
172
    *spi = 1;
S
Shengliang Guan 已提交
173 174 175
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
176 177

  return code;
S
Shengliang Guan 已提交
178 179
}

S
Shengliang Guan 已提交
180 181
static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
182
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
183 184 185
    return 0;
  }

S
Shengliang Guan 已提交
186 187 188 189 190 191 192 193
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
  if (pWrapper != NULL) {
    if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
      dndReleaseWrapper(pWrapper);
      dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
      return 0;
    }
    dndReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
194 195 196
  }

  if (terrno != TSDB_CODE_APP_NOT_READY) {
S
Shengliang 已提交
197
    dTrace("failed to get user auth from mnode since %s", terrstr());
S
Shengliang Guan 已提交
198
    return -1;
S
Shengliang Guan 已提交
199 200
  }

S
Shengliang Guan 已提交
201 202 203
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
204
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
205
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
206

S
Shengliang Guan 已提交
207
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
208
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
209
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
210 211 212 213
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
214
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
215
  } else {
S
Shengliang Guan 已提交
216 217 218 219 220 221 222 223
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
224 225 226 227 228 229
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
230
static int32_t dndInitServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
231
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
232

S
config  
Shengliang Guan 已提交
233
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
S
Shengliang Guan 已提交
234 235 236 237 238 239
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
240
  rpcInit.localPort = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
241
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
242
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
243
  rpcInit.cfp = (RpcCfp)dndProcessRequest;
S
Shengliang Guan 已提交
244
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
245
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
246
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
247
  rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
248
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
249 250 251

  pMgmt->serverRpc = rpcOpen(&rpcInit);
  if (pMgmt->serverRpc == NULL) {
S
Shengliang 已提交
252
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
253 254 255
    return -1;
  }

256
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
257 258 259
  return 0;
}

S
Shengliang Guan 已提交
260
static void dndCleanupServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
261
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
262 263 264
  if (pMgmt->serverRpc) {
    rpcClose(pMgmt->serverRpc);
    pMgmt->serverRpc = NULL;
265
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
266 267 268
  }
}

S
Shengliang Guan 已提交
269 270 271 272 273 274 275 276 277 278 279
int32_t dndInitTrans(SDnode *pDnode) {
  if (dndInitServer(pDnode) != 0) return -1;
  if (dndInitClient(pDnode) != 0) return -1;
  return 0;
}

void dndCleanupTrans(SDnode *pDnode) {
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);
}

S
shm  
Shengliang Guan 已提交
280
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
281
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
282

S
shm  
Shengliang Guan 已提交
283 284
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
285 286

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
287
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
288
      int32_t   vgId = pWrapper->msgVgIds[msgIndex];
S
shm  
Shengliang Guan 已提交
289
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
290

S
xshm  
Shengliang Guan 已提交
291
      // dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
S
Shengliang Guan 已提交
292

S
shm  
Shengliang Guan 已提交
293
      SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
S
Shengliang Guan 已提交
294 295 296 297 298 299 300 301 302 303 304 305
      if (vgId == QND_VGID) {
        if (pHandle->pQndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pQndWrapper = pWrapper;
      } else if (vgId == MND_VGID) {
        if (pHandle->pMndWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
        }
        pHandle->pMndWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
306
      } else {
S
Shengliang Guan 已提交
307 308 309
        if (pHandle->pWrapper != NULL) {
          dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
          return -1;
310
        }
S
Shengliang Guan 已提交
311
        pHandle->pWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
312 313 314 315 316 317 318
      }
    }
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
319
static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
320 321 322 323 324
  if (pMgmt->clientRpc == NULL) {
    terrno = TSDB_CODE_DND_OFFLINE;
    return -1;
  }

S
Shengliang Guan 已提交
325
  rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
326
  return 0;
S
Shengliang Guan 已提交
327 328
}

S
Shengliang Guan 已提交
329
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
330
  if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
S
Shengliang Guan 已提交
331 332 333
    if (pWrapper->ntype == MNODE) {
      dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
      return;
S
Shengliang Guan 已提交
334
    }
S
shm  
Shengliang Guan 已提交
335
  }
S
Shengliang Guan 已提交
336 337

  rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
338 339
}

S
Shengliang Guan 已提交
340 341
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
  if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) {
S
shm  
Shengliang Guan 已提交
342 343 344 345 346 347
    terrno = TSDB_CODE_DND_OFFLINE;
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
    return -1;
  }

  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
348
    return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq);
S
shm  
Shengliang Guan 已提交
349
  } else {
S
Shengliang Guan 已提交
350
    char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
S
shm  
Shengliang Guan 已提交
351 352 353 354
    if (pHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
S
Shengliang Guan 已提交
355

S
shm  
Shengliang Guan 已提交
356 357
    memcpy(pHead, pReq, sizeof(SRpcMsg));
    memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
358 359
    taosProcPutToParentQ(pWrapper->pProc, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
                         PROC_REQ);
S
shm  
Shengliang Guan 已提交
360 361 362 363 364
    taosMemoryFree(pHead);
    return 0;
  }
}

S
Shengliang Guan 已提交
365
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
S
shm  
Shengliang Guan 已提交
366
  if (pWrapper->procType != PROC_CHILD) {
S
shm  
Shengliang Guan 已提交
367
    dndSendRpcRsp(pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
368
  } else {
S
shm  
Shengliang Guan 已提交
369
    taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
S
shm  
Shengliang Guan 已提交
370
  }
D
dapan1121 已提交
371
}
S
Shengliang Guan 已提交
372

S
Shengliang Guan 已提交
373
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
374
  if (pWrapper->procType != PROC_CHILD) {
S
Shengliang Guan 已提交
375
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
376
  } else {
S
shm  
Shengliang Guan 已提交
377
    taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
S
shm  
Shengliang Guan 已提交
378 379 380
  }
}

S
shm  
Shengliang Guan 已提交
381
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
S
shm  
Shengliang Guan 已提交
382 383 384 385
  if (pWrapper->procType != PROC_CHILD) {
    rpcReleaseHandle(handle, type);
  } else {
    SRpcMsg msg = {.handle = handle, .code = type};
S
shm  
Shengliang Guan 已提交
386
    taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
S
Shengliang Guan 已提交
387
  }
S
shm  
Shengliang Guan 已提交
388 389 390 391 392
}

SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
      .pWrapper = pWrapper,
S
shm  
Shengliang Guan 已提交
393 394
      .sendReqFp = dndSendReq,
      .sendRspFp = dndSendRsp,
S
shm  
Shengliang Guan 已提交
395 396 397 398
      .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
      .releaseHandleFp = dndReleaseHandle,
  };
  return msgCb;
S
Shengliang Guan 已提交
399
}
S
Shengliang Guan 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463

static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    if (pRpc->msgType & 1U) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);

  switch (ftype) {
    case PROC_REGIST:
      rpcRegisterBrokenLinkArg(pMsg);
      break;
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
    case PROC_REQ:
      dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
      break;
    case PROC_RSP:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
    default:
      break;
  }
  taosMemoryFree(pMsg);
}

SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}