/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /* this file is mainly responsible for the communication between DNODEs. Each * dnode works as both server and client. Dnode may send status, grant, config * messages to mnode, mnode may send create/alter/drop table/vnode messages * to dnode. All theses messages are handled from here */ #define _DEFAULT_SOURCE #include "dnodeMInfos.h" #include "dnodeMPeer.h" #include "dnodeStep.h" #include "dnodeVMgmt.h" #include "dnodeVWrite.h" #include "mnode.h" #include "os.h" static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void *tsServerRpc = NULL; static void *tsClientRpc = NULL; int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); rpcInitial.localPort = tsDnodeDnodePort; rpcInitial.label = "DND-S"; rpcInitial.numOfThreads = 1; rpcInitial.cfp = dnodeProcessReqMsgFromDnode; rpcInitial.sessions = TSDB_MAX_VNODES << 4; rpcInitial.connType = TAOS_CONN_SERVER; rpcInitial.idleTime = tsShellActivityTimer * 1000; tsServerRpc = rpcOpen(&rpcInitial); if (tsServerRpc == NULL) { dError("failed to init inter-dnodes RPC server"); return -1; } dInfo("dnode inter-dnodes RPC server is initialized"); return 0; } void dnodeCleanupServer() { if (tsServerRpc) { rpcClose(tsServerRpc); tsServerRpc = NULL; dInfo("inter-dnodes RPC server is closed"); } } static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { dnodeSendStartupStep(pMsg); return; } if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); return; } if (pMsg->pCont == NULL) { rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; rpcSendResponse(&rspMsg); return; } if (dnodeProcessReqMsgFp[pMsg->msgType]) { (*dnodeProcessReqMsgFp[pMsg->msgType])(pMsg); } else { dDebug("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); } } int32_t dnodeInitClient() { char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); rpcInitial.label = "DND-C"; rpcInitial.numOfThreads = 1; rpcInitial.cfp = dnodeProcessRspFromDnode; rpcInitial.sessions = TSDB_MAX_VNODES << 4; rpcInitial.connType = TAOS_CONN_CLIENT; rpcInitial.idleTime = tsShellActivityTimer * 1000; rpcInitial.user = "t"; rpcInitial.ckey = "key"; rpcInitial.secret = secret; tsClientRpc = rpcOpen(&rpcInitial); if (tsClientRpc == NULL) { dError("failed to init mnode rpc client"); return -1; } dInfo("dnode inter-dnodes rpc client is initialized"); return 0; } void dnodeCleanupClient() { if (tsClientRpc) { rpcClose(tsClientRpc); tsClientRpc = NULL; dInfo("dnode inter-dnodes rpc client is closed"); } } static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; dTrace("msg:%p is ignored since dnode is stopping", pMsg); rpcFreeCont(pMsg->pCont); return; } if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { dnodeUpdateEpSetForPeer(pEpSet); } if (dnodeProcessRspMsgFp[pMsg->msgType]) { (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { mnodeProcessPeerRsp(pMsg); } rpcFreeCont(pMsg->pCont); } void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; } void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(&epSet); assert(tsClientRpc != 0); rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp); } void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(&epSet); assert(tsClientRpc != 0); rpcSendRecvWithTimeout(tsClientRpc, &epSet, rpcMsg, rpcRsp); } void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); }