dndTransport.c 10.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17 18
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
21
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
24
  SDnode     *pDnode = parent;
S
shm  
Shengliang Guan 已提交
25
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
26
  tmsg_t      msgType = pRsp->msgType;
S
Shengliang Guan 已提交
27

S
shm  
Shengliang Guan 已提交
28
  if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
29
    if (pRsp == NULL || pRsp->pCont == NULL) return;
S
shm  
Shengliang Guan 已提交
30
    dTrace("rsp:%s ignored since dnode exiting, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
S
Shengliang Guan 已提交
31
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
32 33 34
    return;
  }

S
shm  
Shengliang Guan 已提交
35
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
shm  
Shengliang Guan 已提交
36
  if (pHandle->msgFp != NULL) {
S
shm  
Shengliang Guan 已提交
37 38
    dTrace("rsp:%s will be processed by %s, app:%p code:0x%x:%s", TMSG_INFO(msgType), pHandle->pWrapper->name,
           pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
S
shm  
Shengliang Guan 已提交
39
    dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
S
Shengliang Guan 已提交
40
  } else {
S
shm  
Shengliang Guan 已提交
41
    dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
S
Shengliang Guan 已提交
42
    rpcFreeCont(pRsp->pCont);
S
Shengliang Guan 已提交
43 44 45
  }
}

S
shm  
Shengliang Guan 已提交
46
int32_t dndInitClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
47
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
48 49 50

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
51
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
52 53
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = dndProcessResponse;
S
Shengliang Guan 已提交
54
  rpcInit.sessions = 1024;
S
Shengliang Guan 已提交
55
  rpcInit.connType = TAOS_CONN_CLIENT;
S
Shengliang Guan 已提交
56
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
57 58
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
S
Shengliang Guan 已提交
59
  rpcInit.spi = 1;
60
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
61

S
Shengliang Guan 已提交
62 63
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
64 65
  rpcInit.secret = pass;

S
Shengliang Guan 已提交
66 67
  pMgmt->clientRpc = rpcOpen(&rpcInit);
  if (pMgmt->clientRpc == NULL) {
S
Shengliang 已提交
68
    dError("failed to init dnode rpc client");
S
Shengliang Guan 已提交
69 70 71
    return -1;
  }

72
  dDebug("dnode rpc client is initialized");
S
Shengliang Guan 已提交
73 74 75
  return 0;
}

S
shm  
Shengliang Guan 已提交
76
void dndCleanupClient(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
77
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
78 79 80
  if (pMgmt->clientRpc) {
    rpcClose(pMgmt->clientRpc);
    pMgmt->clientRpc = NULL;
81
    dDebug("dnode rpc client is closed");
S
Shengliang Guan 已提交
82 83 84
  }
}

S
Shengliang Guan 已提交
85
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
86
  SDnode     *pDnode = param;
S
shm  
Shengliang Guan 已提交
87
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
88
  tmsg_t      msgType = pReq->msgType;
S
Shengliang Guan 已提交
89

H
Hongze Cheng 已提交
90
  if (msgType == TDMT_DND_NETWORK_TEST) {
S
Shengliang 已提交
91
    dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle);
S
shm  
Shengliang Guan 已提交
92
    dndProcessStartupReq(pDnode, pReq);
S
Shengliang Guan 已提交
93 94 95
    return;
  }

S
shm  
Shengliang Guan 已提交
96
  if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
S
Shengliang 已提交
97 98
    dError("RPC %p, req:%s ignored since dnode exiting, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
99
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
100
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
101
    return;
S
shm  
Shengliang Guan 已提交
102
  } else if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
S
Shengliang 已提交
103 104
    dError("RPC %p, req:%s ignored since dnode not running, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
105
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
106
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
107 108 109
    return;
  }

S
Shengliang Guan 已提交
110
  if (pReq->pCont == NULL) {
S
Shengliang 已提交
111 112
    dTrace("RPC %p, req:%s not processed since its empty, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
113 114 115 116
    rpcSendResponse(&rspMsg);
    return;
  }

S
shm  
Shengliang Guan 已提交
117
  SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
S
shm  
Shengliang Guan 已提交
118
  if (pHandle->msgFp != NULL) {
S
shm  
Shengliang Guan 已提交
119 120
    dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name,
           pReq->ahandle);
S
shm  
Shengliang Guan 已提交
121
    dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
S
Shengliang Guan 已提交
122
  } else {
S
Shengliang 已提交
123 124
    dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
    SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
S
Shengliang Guan 已提交
125
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
126
    rpcFreeCont(pReq->pCont);
S
Shengliang Guan 已提交
127 128 129 130
  }
}

static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
S
shm  
Shengliang Guan 已提交
131
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
132 133

  SEpSet epSet = {0};
S
shm  
Shengliang Guan 已提交
134
  dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet);
S
Shengliang Guan 已提交
135 136 137
  rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}

S
shm  
Shengliang Guan 已提交
138 139 140 141
static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

S
Shengliang Guan 已提交
142
  if (strcmp(user, INTERNAL_USER) == 0) {
S
Shengliang Guan 已提交
143
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
S
Shengliang Guan 已提交
144
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
S
Shengliang Guan 已提交
145
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
S
shm  
Shengliang Guan 已提交
146 147 148 149 150
  } else {
    code = -1;
  }

  if (code == 0) {
151
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
S
Shengliang Guan 已提交
152
    *spi = 1;
S
Shengliang Guan 已提交
153 154 155
    *encrypt = 0;
    *ckey = 0;
  }
S
shm  
Shengliang Guan 已提交
156 157

  return code;
S
Shengliang Guan 已提交
158 159
}

S
Shengliang Guan 已提交
160 161 162
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  SDnode *pDnode = parent;

S
shm  
Shengliang Guan 已提交
163
  if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
164
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
165 166 167
    return 0;
  }

S
shm  
Shengliang Guan 已提交
168
  if (mmGetUserAuth(dndGetWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang 已提交
169
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
S
Shengliang Guan 已提交
170 171 172 173
    return 0;
  }

  if (terrno != TSDB_CODE_APP_NOT_READY) {
S
Shengliang 已提交
174
    dTrace("failed to get user auth from mnode since %s", terrstr());
S
Shengliang Guan 已提交
175
    return -1;
S
Shengliang Guan 已提交
176 177
  }

S
Shengliang Guan 已提交
178 179 180
  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
S
shm  
Shengliang Guan 已提交
181
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
182
  tSerializeSAuthReq(pReq, contLen, &authReq);
S
Shengliang Guan 已提交
183

S
Shengliang Guan 已提交
184
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
S
Shengliang Guan 已提交
185
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
186
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
187 188 189 190
  dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
S
Shengliang Guan 已提交
191
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
S
Shengliang Guan 已提交
192
  } else {
S
Shengliang Guan 已提交
193 194 195 196 197 198 199 200
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
S
Shengliang Guan 已提交
201 202 203 204 205 206
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
shm  
Shengliang Guan 已提交
207
int32_t dndInitServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
208
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
209

S
config  
Shengliang Guan 已提交
210
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
S
Shengliang Guan 已提交
211 212 213 214 215 216
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
shm  
Shengliang Guan 已提交
217
  rpcInit.localPort = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
218
  rpcInit.label = "DND";
S
Shengliang Guan 已提交
219 220
  rpcInit.numOfThreads = numOfThreads;
  rpcInit.cfp = dndProcessRequest;
S
Shengliang Guan 已提交
221
  rpcInit.sessions = tsMaxShellConns;
S
Shengliang Guan 已提交
222
  rpcInit.connType = TAOS_CONN_SERVER;
S
Shengliang Guan 已提交
223
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
224
  rpcInit.afp = dndRetrieveUserAuthInfo;
225
  rpcInit.parent = pDnode;
S
Shengliang Guan 已提交
226 227 228

  pMgmt->serverRpc = rpcOpen(&rpcInit);
  if (pMgmt->serverRpc == NULL) {
S
Shengliang 已提交
229
    dError("failed to init dnode rpc server");
S
Shengliang Guan 已提交
230 231 232
    return -1;
  }

233
  dDebug("dnode rpc server is initialized");
S
Shengliang Guan 已提交
234 235 236
  return 0;
}

S
shm  
Shengliang Guan 已提交
237
void dndCleanupServer(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
238
  STransMgmt *pMgmt = &pDnode->trans;
S
Shengliang Guan 已提交
239 240 241
  if (pMgmt->serverRpc) {
    rpcClose(pMgmt->serverRpc);
    pMgmt->serverRpc = NULL;
242
    dDebug("dnode rpc server is closed");
S
Shengliang Guan 已提交
243 244 245
  }
}

S
shm  
Shengliang Guan 已提交
246
int32_t dndInitMsgHandle(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
247
  STransMgmt *pMgmt = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
248

S
shm  
Shengliang Guan 已提交
249 250
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
251 252

    for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
S
shm  
Shengliang Guan 已提交
253 254
      NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
      if (msgFp == NULL) continue;
S
shm  
Shengliang Guan 已提交
255 256

      SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
S
shm  
Shengliang Guan 已提交
257
      if (pHandle->msgFp != NULL) {
S
shm  
Shengliang Guan 已提交
258
        dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
S
shm  
Shengliang Guan 已提交
259 260 261
               pHandle->pWrapper->name, pWrapper->name);
        return -1;
      } else {
S
shm  
Shengliang Guan 已提交
262
        dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name);
S
shm  
Shengliang Guan 已提交
263
        pHandle->msgFp = msgFp;
S
shm  
Shengliang Guan 已提交
264
        pHandle->pWrapper = pWrapper;
S
shm  
Shengliang Guan 已提交
265 266 267 268 269 270 271
      }
    }
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
272
static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
273 274 275 276 277
  if (pMgmt->clientRpc == NULL) {
    terrno = TSDB_CODE_DND_OFFLINE;
    return -1;
  }

S
Shengliang Guan 已提交
278
  rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
S
Shengliang Guan 已提交
279
  return 0;
S
Shengliang Guan 已提交
280 281
}

S
shm  
Shengliang Guan 已提交
282 283
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
  SMgmtWrapper *pWrapper = wrapper;
S
shm  
Shengliang Guan 已提交
284 285 286 287 288 289

  if (pWrapper->procType == PROC_CHILD) {
  } else {
    STransMgmt *pTrans = &pWrapper->pDnode->trans;
    return dndSendRpcReq(pTrans, pEpSet, pReq);
  }
S
shm  
Shengliang Guan 已提交
290 291 292 293 294
}

int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) {
  SMgmtWrapper *pWrapper = wrapper;

S
shm  
Shengliang Guan 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
  if (pWrapper->procType == PROC_CHILD) {
  } else {
    SDnode     *pDnode = pWrapper->pDnode;
    STransMgmt *pTrans = &pDnode->trans;
    SEpSet      epSet = {0};
    dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet);
    return dndSendRpcReq(pTrans, &epSet, pReq);
  }
}

void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
    dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp);
  } else {
    rpcSendResponse(pRsp);
  }
}

void dndSendRsp(void *wrapper, SRpcMsg *pRsp) {
  SMgmtWrapper *pWrapper = wrapper;

  if (pWrapper->procType == PROC_CHILD) {
    int32_t code = -1;
    do {
      code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
      if (code != 0) {
        taosMsleep(10);
      }
    } while (code != 0);
  } else {
    dndSendRpcRsp(pWrapper, pRsp);
  }
D
dapan1121 已提交
327
}