dmTransport.c 11.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18
#include "qworker.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tversion.h"
20

21
static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); }
S
Shengliang Guan 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
  SEpSet epSet = {0};
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);

  const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->pCont = rpcMallocCont(contLen);
  if (pMsg->pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
    pMsg->contLen = contLen;
  }
}

S
Shengliang Guan 已提交
37
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
38 39
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
40
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
41 42
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
43
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
44 45
    return -1;
  }
S
Shengliang Guan 已提交
46

S
Shengliang Guan 已提交
47
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
S
Shengliang Guan 已提交
48
  pMsg->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
49 50 51
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

dengyihao's avatar
dengyihao 已提交
52 53
static bool dmFailFastFp(tmsg_t msgType) {
  // add more msg type later
dengyihao's avatar
dengyihao 已提交
54
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
dengyihao's avatar
dengyihao 已提交
55 56
}

dengyihao's avatar
dengyihao 已提交
57 58 59 60 61 62 63 64 65
static void dmConvertErrCode(tmsg_t msgType) {
  if (terrno != TSDB_CODE_APP_IS_STOPPING) {
    return;
  }
  if ((msgType > TDMT_VND_MSG && msgType < TDMT_VND_MAX_MSG) ||
      (msgType > TDMT_SCH_MSG && msgType < TDMT_SCH_MAX_MSG)) {
    terrno = TSDB_CODE_VND_STOPPED;
  }
}
S
Shengliang Guan 已提交
66
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
67
  SDnodeTrans  *pTrans = &pDnode->trans;
S
Shengliang Guan 已提交
68
  int32_t       code = -1;
S
Shengliang Guan 已提交
69
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
70
  SMgmtWrapper *pWrapper = NULL;
S
Shengliang Guan 已提交
71
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
72

S
Shengliang Guan 已提交
73
  const STraceId *trace = &pRpc->info.traceId;
dengyihao's avatar
dengyihao 已提交
74 75
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
76

dengyihao's avatar
dengyihao 已提交
77 78 79
  int32_t svrVer = 0;
  taosVersionStrToInt(version, &svrVer);
  if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) {
dengyihao's avatar
dengyihao 已提交
80
    dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer);
dengyihao's avatar
dengyihao 已提交
81 82 83
    goto _OVER;
  }

S
Shengliang Guan 已提交
84 85 86 87 88
  switch (pRpc->msgType) {
    case TDMT_DND_NET_TEST:
      dmProcessNetTestReq(pDnode, pRpc);
      return;
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
89
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
D
dapan1121 已提交
90
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
91
    case TDMT_SCH_MERGE_FETCH_RSP:
D
dapan1121 已提交
92 93
    case TDMT_VND_SUBMIT_RSP:
      qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101
      return;
    case TDMT_MND_STATUS_RSP:
      if (pEpSet != NULL) {
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
      }
      break;
    default:
      break;
S
Shengliang Guan 已提交
102 103
  }

dengyihao's avatar
dengyihao 已提交
104 105 106 107 108 109 110 111 112
  /*
  pDnode is null, TD-22618
  at trans.c line 91
  before this line, dmProcessRpcMsg callback is set
  after this line, parent is set
  so when dmProcessRpcMsg is called, pDonde is still null.
  */
  if (pDnode != NULL) {
    if (pDnode->status != DND_STAT_RUNNING) {
C
cademfly 已提交
113 114 115
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
        dmProcessServerStartupStatus(pDnode, pRpc);
        return;
116
      } else {
C
cademfly 已提交
117 118 119 120 121 122
        if (pDnode->status == DND_STAT_INIT) {
          terrno = TSDB_CODE_APP_IS_STARTING;
        } else {
          terrno = TSDB_CODE_APP_IS_STOPPING;
        }
        goto _OVER;
123
      }
dengyihao's avatar
dengyihao 已提交
124
    }
C
cademfly 已提交
125 126 127
  } else {
    terrno = TSDB_CODE_APP_IS_STARTING;
    goto _OVER;
S
Shengliang Guan 已提交
128 129
  }

130
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
131
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
132
    terrno = TSDB_CODE_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
133
    goto _OVER;
dengyihao's avatar
dengyihao 已提交
134 135 136 137 138 139
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
    terrno = pRpc->code;
    goto _OVER;
  }
S
Shengliang Guan 已提交
140 141

  if (pHandle->defaultNtype == NODE_END) {
142
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
143
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
144
    goto _OVER;
S
Shengliang Guan 已提交
145 146 147 148 149 150 151 152 153
  }

  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
  if (pHandle->needCheckVgId) {
    if (pRpc->contLen > 0) {
      const SMsgHead *pHead = pRpc->pCont;
      const int32_t   vgId = ntohl(pHead->vgId);
      switch (vgId) {
        case QNODE_HANDLE:
154
          pWrapper = &pDnode->wrappers[QNODE];
S
Shengliang Guan 已提交
155 156
          break;
        case SNODE_HANDLE:
157
          pWrapper = &pDnode->wrappers[SNODE];
S
Shengliang Guan 已提交
158 159
          break;
        case MNODE_HANDLE:
160
          pWrapper = &pDnode->wrappers[MNODE];
S
Shengliang Guan 已提交
161 162 163
          break;
        default:
          break;
S
Shengliang Guan 已提交
164
      }
S
Shengliang Guan 已提交
165
    } else {
166
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
167 168
      terrno = TSDB_CODE_INVALID_MSG_LEN;
      goto _OVER;
S
Shengliang Guan 已提交
169 170
    }
  }
S
Shengliang Guan 已提交
171

S
Shengliang Guan 已提交
172
  if (dmMarkWrapper(pWrapper) != 0) {
S
Shengliang Guan 已提交
173 174
    pWrapper = NULL;
    goto _OVER;
S
Shengliang Guan 已提交
175
  }
S
Shengliang Guan 已提交
176

S
Shengliang Guan 已提交
177
  pRpc->info.wrapper = pWrapper;
S
Shengliang Guan 已提交
178
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
179
  if (pMsg == NULL) goto _OVER;
S
Shengliang Guan 已提交
180

S
Shengliang Guan 已提交
181
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
182 183
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
          pRpc->contLen);
S
Shengliang Guan 已提交
184

185
  code = dmProcessNodeMsg(pWrapper, pMsg);
S
Shengliang Guan 已提交
186 187

_OVER:
188
  if (code != 0) {
dengyihao's avatar
dengyihao 已提交
189
    dmConvertErrCode(pRpc->msgType);
S
Shengliang Guan 已提交
190
    if (terrno != 0) code = terrno;
L
Liu Jicong 已提交
191 192 193 194 195
    if (pMsg) {
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
    } else {
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, terrstr());
    }
S
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197
    if (IsReq(pRpc)) {
198
      SRpcMsg rsp = {.code = code, .info = pRpc->info};
199
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
200 201 202 203 204
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
      }

      if (pWrapper != NULL) {
        dmSendRsp(&rsp);
S
Shengliang Guan 已提交
205
      } else {
206
        rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
207
      }
S
Shengliang Guan 已提交
208
    }
S
Shengliang Guan 已提交
209

S
Shengliang Guan 已提交
210
    if (pMsg != NULL) {
S
Shengliang Guan 已提交
211
      dGTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
212 213
      taosFreeQitem(pMsg);
    }
S
Shengliang Guan 已提交
214
    rpcFreeCont(pRpc->pCont);
S
Shengliang Guan 已提交
215
    pRpc->pCont = NULL;
S
Shengliang Guan 已提交
216 217
  }

S
Shengliang Guan 已提交
218
  dmReleaseWrapper(pWrapper);
S
Shengliang Guan 已提交
219 220
}

S
Shengliang Guan 已提交
221
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
222
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
223

S
Shengliang 已提交
224 225
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
226
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
S
Shengliang Guan 已提交
227 228 229
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
S
Shengliang Guan 已提交
230
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
231
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
S
Shengliang Guan 已提交
232 233 234 235
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
236
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
237
      }
S
Shengliang Guan 已提交
238
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
239
    }
S
Shengliang Guan 已提交
240 241

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
242 243 244 245 246
  }

  return 0;
}

S
Shengliang Guan 已提交
247
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
248
  SDnode *pDnode = dmInstance();
249
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
S
Shengliang Guan 已提交
250 251
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
252 253 254 255 256
    if (pDnode->status == DND_STAT_INIT) {
      terrno = TSDB_CODE_APP_IS_STARTING;
    } else {
      terrno = TSDB_CODE_APP_IS_STOPPING;
    }
257
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
S
shm  
Shengliang Guan 已提交
258
    return -1;
259
  } else {
S
Shengliang Guan 已提交
260
    rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
261
    return 0;
S
shm  
Shengliang Guan 已提交
262 263 264
  }
}

265
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); }
S
shm  
Shengliang Guan 已提交
266

267
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); }
S
shm  
Shengliang Guan 已提交
268

dengyihao's avatar
dengyihao 已提交
269
static bool rpcRfp(int32_t code, tmsg_t msgType) {
270
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
dengyihao's avatar
dengyihao 已提交
271 272 273
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
      code == TSDB_CODE_APP_IS_STOPPING) {
L
Liu Jicong 已提交
274 275
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
        msgType == TDMT_SCH_MERGE_FETCH) {
dengyihao's avatar
dengyihao 已提交
276 277
      return false;
    }
dengyihao's avatar
dengyihao 已提交
278 279 280 281 282
    return true;
  } else {
    return false;
  }
}
M
Minghao Li 已提交
283

284
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
285 286 287
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
dengyihao's avatar
dengyihao 已提交
288
  rpcInit.label = "DND-C";
dengyihao's avatar
dengyihao 已提交
289
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
290
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
291 292
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
K
kailixu 已提交
293
  rpcInit.user = TSDB_DEFAULT_USER;
S
Shengliang Guan 已提交
294 295
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
296
  rpcInit.rfp = rpcRfp;
dengyihao's avatar
dengyihao 已提交
297
  rpcInit.compressSize = tsCompressMsgSize;
dengyihao's avatar
dengyihao 已提交
298 299 300 301

  rpcInit.retryMinInterval = tsRedirectPeriod;
  rpcInit.retryStepFactor = tsRedirectFactor;
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
dengyihao's avatar
dengyihao 已提交
302
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
S
Shengliang Guan 已提交
303

dengyihao's avatar
dengyihao 已提交
304
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
dengyihao's avatar
dengyihao 已提交
305 306 307
  rpcInit.failFastThreshold = 3;    // failed threshold
  rpcInit.ffp = dmFailFastFp;

dengyihao's avatar
dengyihao 已提交
308
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
dengyihao's avatar
dengyihao 已提交
309
  connLimitNum = TMAX(connLimitNum, 10);
dengyihao's avatar
dengyihao 已提交
310
  connLimitNum = TMIN(connLimitNum, 500);
dengyihao's avatar
dengyihao 已提交
311 312

  rpcInit.connLimitNum = connLimitNum;
dengyihao's avatar
dengyihao 已提交
313
  rpcInit.connLimitLock = 1;
dengyihao's avatar
dengyihao 已提交
314
  rpcInit.supportBatch = 1;
dengyihao's avatar
dengyihao 已提交
315
  rpcInit.batchSize = 8 * 1024;
dengyihao's avatar
dengyihao 已提交
316
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
dengyihao's avatar
dengyihao 已提交
317
  taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
dengyihao's avatar
dengyihao 已提交
318

S
Shengliang Guan 已提交
319 320 321 322 323 324 325 326 327 328
  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

329
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
330 331 332 333 334 335 336 337
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

338
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
339 340 341
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
S
Shengliang Guan 已提交
342
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
343
  rpcInit.localPort = tsServerPort;
dengyihao's avatar
dengyihao 已提交
344
  rpcInit.label = "DND-S";
S
Shengliang Guan 已提交
345
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
346
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
347 348 349 350
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.parent = pDnode;
dengyihao's avatar
dengyihao 已提交
351
  rpcInit.compressSize = tsCompressMsgSize;
dengyihao's avatar
dengyihao 已提交
352
  taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
S
Shengliang Guan 已提交
353 354 355 356 357 358 359 360 361 362
  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

363
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
364 365 366 367 368 369 370 371
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

372 373 374 375 376 377 378 379
SMsgCb dmGetMsgcb(SDnode *pDnode) {
  SMsgCb msgCb = {
      .clientRpc = pDnode->trans.clientRpc,
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
      .reportStartupFp = dmReportStartup,
380 381
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
      .data = &pDnode->data,
S
Shengliang Guan 已提交
382 383
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
384
}