From 401cf95662b7d9c56de0f773cd50abeffae68c8f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 1 Nov 2021 20:08:54 +0800 Subject: [PATCH] refact file directory --- .../mgmt/inc/{dnodeEps.h => dnodeConfig.h} | 6 +- source/dnode/mgmt/inc/dnodeDnode.h | 34 ++ source/dnode/mgmt/inc/dnodeMnode.h | 30 ++ .../inc/{dnodeTrans.h => dnodeTransport.h} | 6 +- .../mgmt/inc/{dnodeMsg.h => dnodeVnodes.h} | 6 +- .../mgmt/src/{dnodeEps.c => dnodeConfig.c} | 2 +- source/dnode/mgmt/src/dnodeDnode.c | 174 ++++++++ source/dnode/mgmt/src/dnodeInt.c | 6 +- source/dnode/mgmt/src/dnodeTransport.c | 382 ++++++++++++++++++ source/dnode/mgmt/src/dnodeVnodes.c | 0 source/server/dnode/src/dnodeInt.c | 204 ++++++++++ .../mgmt => server/dnode}/src/dnodeMsg.c | 4 +- .../mgmt => server/dnode}/src/dnodeTrans.c | 6 +- 13 files changed, 842 insertions(+), 18 deletions(-) rename source/dnode/mgmt/inc/{dnodeEps.h => dnodeConfig.h} (93%) create mode 100644 source/dnode/mgmt/inc/dnodeDnode.h create mode 100644 source/dnode/mgmt/inc/dnodeMnode.h rename source/dnode/mgmt/inc/{dnodeTrans.h => dnodeTransport.h} (90%) rename source/dnode/mgmt/inc/{dnodeMsg.h => dnodeVnodes.h} (91%) rename source/dnode/mgmt/src/{dnodeEps.c => dnodeConfig.c} (99%) create mode 100644 source/dnode/mgmt/src/dnodeDnode.c create mode 100644 source/dnode/mgmt/src/dnodeTransport.c create mode 100644 source/dnode/mgmt/src/dnodeVnodes.c create mode 100644 source/server/dnode/src/dnodeInt.c rename source/{dnode/mgmt => server/dnode}/src/dnodeMsg.c (99%) rename source/{dnode/mgmt => server/dnode}/src/dnodeTrans.c (99%) diff --git a/source/dnode/mgmt/inc/dnodeEps.h b/source/dnode/mgmt/inc/dnodeConfig.h similarity index 93% rename from source/dnode/mgmt/inc/dnodeEps.h rename to source/dnode/mgmt/inc/dnodeConfig.h index ac68d16374..9d446a9f1a 100644 --- a/source/dnode/mgmt/inc/dnodeEps.h +++ b/source/dnode/mgmt/inc/dnodeConfig.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DNODE_EPS_H_ -#define _TD_DNODE_EPS_H_ +#ifndef _TD_DNODE_CONFIG_H_ +#define _TD_DNODE_CONFIG_H_ #ifdef __cplusplus extern "C" { @@ -39,4 +39,4 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); } #endif -#endif /*_TD_DNODE_EPS_H_*/ \ No newline at end of file +#endif /*_TD_DNODE_CONFIG_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/inc/dnodeDnode.h new file mode 100644 index 0000000000..35f68f0306 --- /dev/null +++ b/source/dnode/mgmt/inc/dnodeDnode.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_DNODE_H_ +#define _TD_DNODE_DNODE_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dnodeInt.h" + +int32_t dnodeInitMsg(); +void dnodeCleanupMsg(); +void dnodeProcessStatusRsp(SRpcMsg *pMsg); +void dnodeProcessStartupReq(SRpcMsg *pMsg); +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_DNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/inc/dnodeMnode.h b/source/dnode/mgmt/inc/dnodeMnode.h new file mode 100644 index 0000000000..0dccb8c39d --- /dev/null +++ b/source/dnode/mgmt/inc/dnodeMnode.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_MNODE_H_ +#define _TD_DNODE_MNODE_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dnodeInt.h" + +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_MNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/inc/dnodeTrans.h b/source/dnode/mgmt/inc/dnodeTransport.h similarity index 90% rename from source/dnode/mgmt/inc/dnodeTrans.h rename to source/dnode/mgmt/inc/dnodeTransport.h index f2dc647de3..e8223f4c06 100644 --- a/source/dnode/mgmt/inc/dnodeTrans.h +++ b/source/dnode/mgmt/inc/dnodeTransport.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DNODE_TRANS_H_ -#define _TD_DNODE_TRANS_H_ +#ifndef _TD_DNODE_TRANSPORT_H_ +#define _TD_DNODE_TRANSPORT_H_ #ifdef __cplusplus extern "C" { @@ -30,4 +30,4 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); } #endif -#endif /*_TD_DNODE_TRANS_H_*/ +#endif /*_TD_DNODE_TRANSPORT_H_*/ diff --git a/source/dnode/mgmt/inc/dnodeMsg.h b/source/dnode/mgmt/inc/dnodeVnodes.h similarity index 91% rename from source/dnode/mgmt/inc/dnodeMsg.h rename to source/dnode/mgmt/inc/dnodeVnodes.h index 0790fa7e3e..d54fda654a 100644 --- a/source/dnode/mgmt/inc/dnodeMsg.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DNODE_STATUS_H_ -#define _TD_DNODE_STATUS_H_ +#ifndef _TD_DNODE_VNODES_H_ +#define _TD_DNODE_VNODES_H_ #ifdef __cplusplus extern "C" { @@ -32,4 +32,4 @@ void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); } #endif -#endif /*_TD_DNODE_STATUS_H_*/ \ No newline at end of file +#endif /*_TD_DNODE_VNODES_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeEps.c b/source/dnode/mgmt/src/dnodeConfig.c similarity index 99% rename from source/dnode/mgmt/src/dnodeEps.c rename to source/dnode/mgmt/src/dnodeConfig.c index 5b843df2f2..fc4c821bb6 100644 --- a/source/dnode/mgmt/src/dnodeEps.c +++ b/source/dnode/mgmt/src/dnodeConfig.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dnodeEps.h" +#include "dnodeConfig.h" #include "cJSON.h" #include "thash.h" diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c new file mode 100644 index 0000000000..0a598527b4 --- /dev/null +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dnodeDnode.h" +#include "dnodeConfig.h" +#include "mnode.h" +#include "tthread.h" +#include "ttime.h" +#include "vnode.h" + +static struct { + pthread_t *threadId; + bool stop; + uint32_t rebootTime; +} tsMsg; + +static void dnodeSendStatusMsg() { + int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + dError("failed to malloc status message"); + return; + } + + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(dnodeGetDnodeId()); + tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); + pStatus->clusterId = htobe64(dnodeGetClusterId()); + pStatus->lastReboot = htonl(tsMsg.rebootTime); + pStatus->numOfCores = htonl(tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + + // fill cluster cfg parameters + pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); + pStatus->clusterCfg.checkTime = 0; + tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64); + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); + + // vnodeGetStatus(NULL, pStatus); + // contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + // pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; + + dnodeSendMsgToMnode(&rpcMsg); +} + +void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + dTrace("status rsp is received, code:%s", tstrerror(pMsg->code)); + if (pMsg->code != TSDB_CODE_SUCCESS) return; + + SStatusRsp *pStatusRsp = pMsg->pCont; + + SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->clusterId = htobe64(pCfg->clusterId); + pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); + pCfg->numOfDnodes = htonl(pCfg->numOfDnodes); + dnodeUpdateCfg(pCfg); + + if (pCfg->dropped) { + dError("status rsp is received, and set dnode to drop status"); + return; + } + + // vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); + + SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); + eps->dnodeNum = htonl(eps->dnodeNum); + for (int32_t i = 0; i < eps->dnodeNum; ++i) { + eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId); + eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort); + } + + dnodeUpdateDnodeEps(eps); +} + +static void *dnodeThreadRoutine(void *param) { + int32_t ms = tsStatusInterval * 1000; + while (!tsMsg.stop) { + taosMsleep(ms); + dnodeSendStatusMsg(); + } +} + +int32_t dnodeInitMsg() { + tsMsg.stop = false; + tsMsg.rebootTime = taosGetTimestampSec(); + tsMsg.threadId = taosCreateThread(dnodeThreadRoutine, NULL); + if (tsMsg.threadId == NULL) { + return -1; + } + + dInfo("dnode msg is initialized"); + return 0; +} + +void dnodeCleanupMsg() { + if (tsMsg.threadId != NULL) { + tsMsg.stop = true; + taosDestoryThread(tsMsg.threadId); + tsMsg.threadId = NULL; + } + + dInfo("dnode msg is cleanuped"); +} + +static int32_t dnodeStartMnode(SRpcMsg *pMsg) { + SCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->mnodeNum = htonl(pCfg->mnodeNum); + for (int32_t i = 0; i < pCfg->mnodeNum; ++i) { + pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId); + pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort); + } + + if (pCfg->dnodeId != dnodeGetDnodeId()) { + dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (mnodeGetStatus() == MN_STATUS_READY) return 0; + + return mnodeDeploy(); +} + +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(pMsg); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { + SCfgDnodeMsg *pCfg = pMsg->pCont; + + int32_t code = taosCfgDynamicOptions(pCfg->config); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessStartupReq(SRpcMsg *pMsg) { + dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); + + SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); + dnodeGetStartup(pStep); + + dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); + + SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); +} diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index 1166a06d38..aa66ab3da3 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -15,9 +15,9 @@ #define _DEFAULT_SOURCE #include "dnodeCheck.h" -#include "dnodeEps.h" -#include "dnodeMsg.h" -#include "dnodeTrans.h" +#include "dnodeConfig.h" +#include "dnodeDnode.h" +#include "dnodeTransport.h" #include "mnode.h" #include "sync.h" #include "tcache.h" diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c new file mode 100644 index 0000000000..68ec0a44e5 --- /dev/null +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -0,0 +1,382 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/* this file is mainly responsible for the communication between DNODEs. Each + * dnode works as both server and client. Dnode may send status, grant, config + * messages to mnode, mnode may send create/alter/drop table/vnode messages + * to dnode. All theses messages are handled from here + */ + +#define _DEFAULT_SOURCE +#include "dnodeTransport.h" +#include "dnodeConfig.h" +#include "dnodeDnode.h" +#include "dnodeMnode.h" +#include "dnodeVnodes.h" +#include "mnode.h" +#include "vnode.h" + +typedef void (*MsgFp)(SRpcMsg *pMsg); + +static struct { + void *serverRpc; + void *clientRpc; + void *shellRpc; + MsgFp msgFp[TSDB_MSG_TYPE_MAX]; +} tsTrans; + +static void dnodeInitMsgFp() { + // msg from client to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; + + // msg from client to mnode + tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg; + + // message from mnode to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg; + + // message from dnode to mnode + tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL; + tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL; + tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; +} + +static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + int32_t msgType = pMsg->msgType; + + if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { + dnodeProcessStartupReq(pMsg); + return; + } + + if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + rspMsg.code = TSDB_CODE_APP_NOT_READY; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); + return; + } + + if (pMsg->pCont == NULL) { + rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; + rpcSendResponse(&rspMsg); + return; + } + + MsgFp fp = tsTrans.msgFp[msgType]; + if (fp != NULL) { + dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]); + (*fp)(pMsg); + } else { + dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + } +} + +static int32_t dnodeInitServer() { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = tsDnodeDnodePort; + rpcInit.label = "DND-S"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessPeerReq; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + + tsTrans.serverRpc = rpcOpen(&rpcInit); + if (tsTrans.serverRpc == NULL) { + dError("failed to init peer rpc server"); + return -1; + } + + dInfo("dnode peer rpc server is initialized"); + return 0; +} + +static void dnodeCleanupServer() { + if (tsTrans.serverRpc) { + rpcClose(tsTrans.serverRpc); + tsTrans.serverRpc = NULL; + dInfo("dnode peer server is closed"); + } +} + +static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + int32_t msgType = pMsg->msgType; + + if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { + if (pMsg == NULL || pMsg->pCont == NULL) return; + dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); + rpcFreeCont(pMsg->pCont); + return; + } + + if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { + dnodeUpdateMnodeEps(pEpSet); + } + + MsgFp fp = tsTrans.msgFp[msgType]; + if (fp != NULL) { + dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); + (*fp)(pMsg); + } else { + dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); + } + + rpcFreeCont(pMsg->pCont); +} + +static int32_t dnodeInitClient() { + char secret[TSDB_KEY_LEN] = "secret"; + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.label = "DND-C"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessPeerRsp; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = secret; + + tsTrans.clientRpc = rpcOpen(&rpcInit); + if (tsTrans.clientRpc == NULL) { + dError("failed to init peer rpc client"); + return -1; + } + + dInfo("dnode peer rpc client is initialized"); + return 0; +} + +static void dnodeCleanupClient() { + if (tsTrans.clientRpc) { + rpcClose(tsTrans.clientRpc); + tsTrans.clientRpc = NULL; + dInfo("dnode peer rpc client is closed"); + } +} + +static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + int32_t msgType = pMsg->msgType; + + if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { + dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_DND_EXITING; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + return; + } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_APP_NOT_READY; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + return; + } + + if (pMsg->pCont == NULL) { + rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; + rpcSendResponse(&rspMsg); + return; + } + + MsgFp fp = tsTrans.msgFp[msgType]; + if (fp != NULL) { + dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]); + (*fp)(pMsg); + } else { + dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + } +} + +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } + +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { + SRpcEpSet epSet = {0}; + dnodeGetEpSetForPeer(&epSet); + dnodeSendMsgToDnode(&epSet, rpcMsg); +} + +static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { + SRpcEpSet epSet = {0}; + dnodeGetEpSetForPeer(&epSet); + rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); +} + +static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); + if (code != TSDB_CODE_APP_NOT_READY) return code; + + SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); + tstrncpy(pMsg->user, user, sizeof(pMsg->user)); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pMsg; + rpcMsg.contLen = sizeof(SAuthMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH; + + dDebug("user:%s, send auth msg to mnodes", user); + SRpcMsg rpcRsp = {0}; + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); + + if (rpcRsp.code != 0) { + dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); + } else { + SAuthRsp *pRsp = rpcRsp.pCont; + dDebug("user:%s, auth msg received from mnodes", user); + memcpy(secret, pRsp->secret, TSDB_KEY_LEN); + memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); + *spi = pRsp->spi; + *encrypt = pRsp->encrypt; + } + + rpcFreeCont(rpcRsp.pCont); + return rpcRsp.code; +} + +static int32_t dnodeInitShell() { + int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); + if (numOfThreads < 1) { + numOfThreads = 1; + } + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = tsDnodeShellPort; + rpcInit.label = "SHELL"; + rpcInit.numOfThreads = numOfThreads; + rpcInit.cfp = dnodeProcessShellReq; + rpcInit.sessions = tsMaxShellConns; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.afp = dnodeRetrieveUserAuthInfo; + + tsTrans.shellRpc = rpcOpen(&rpcInit); + if (tsTrans.shellRpc == NULL) { + dError("failed to init shell rpc server"); + return -1; + } + + dInfo("dnode shell rpc server is initialized"); + return 0; +} + +static void dnodeCleanupShell() { + if (tsTrans.shellRpc) { + rpcClose(tsTrans.shellRpc); + tsTrans.shellRpc = NULL; + } +} + +int32_t dnodeInitTrans() { + if (dnodeInitClient() != 0) { + return -1; + } + + if (dnodeInitServer() != 0) { + return -1; + } + + if (dnodeInitShell() != 0) { + return -1; + } + + return 0; +} + +void dnodeCleanupTrans() { + dnodeCleanupShell(); + dnodeCleanupServer(); + dnodeCleanupClient(); +} diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c new file mode 100644 index 0000000000..e69de29bb2 diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c new file mode 100644 index 0000000000..aa66ab3da3 --- /dev/null +++ b/source/server/dnode/src/dnodeInt.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dnodeCheck.h" +#include "dnodeConfig.h" +#include "dnodeDnode.h" +#include "dnodeTransport.h" +#include "mnode.h" +#include "sync.h" +#include "tcache.h" +#include "tconfig.h" +#include "tnote.h" +#include "tstep.h" +#include "vnode.h" +#include "wal.h" + +static struct { + EDnStat runStatus; + SStartupStep startup; + SSteps *steps; +} tsDnode; + +EDnStat dnodeGetRunStat() { return tsDnode.runStatus; } + +void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; } + +void dnodeReportStartup(char *name, char *desc) { + SStartupStep *startup = &tsDnode.startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 0; +} + +static void dnodeReportStartupFinished(char *name, char *desc) { + SStartupStep *startup = &tsDnode.startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 1; +} + +void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); } + +static int32_t dnodeInitVnode() { + return vnodeInit(); +} + +static int32_t dnodeInitMnode() { + SMnodePara para; + para.fp.GetDnodeEp = dnodeGetEp; + para.fp.SendMsgToDnode = dnodeSendMsgToDnode; + para.fp.SendMsgToMnode = dnodeSendMsgToMnode; + para.fp.SendRedirectMsg = dnodeSendRedirectMsg; + para.dnodeId = dnodeGetDnodeId(); + para.clusterId = dnodeGetClusterId(); + + return mnodeInit(para); +} + +static int32_t dnodeInitTfs() {} + +static int32_t dnodeInitMain() { + tsDnode.runStatus = DN_RUN_STAT_STOPPED; + tscEmbedded = 1; + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + taosSetCoreDump(tsEnableCoreFile); + + if (!taosMkDir(tsLogDir)) { + printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); + return -1; + } + + char temp[TSDB_FILENAME_LEN]; + sprintf(temp, "%s/taosdlog", tsLogDir); + if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { + printf("failed to init log file\n"); + } + + if (!taosReadGlobalCfg()) { + taosPrintGlobalCfg(); + dError("TDengine read global config failed"); + return -1; + } + + dInfo("start to initialize TDengine"); + + taosInitNotes(); + + return taosCheckGlobalCfg(); +} + +static void dnodeCleanupMain() { + taos_cleanup(); + taosCloseLog(); + taosStopCacheRefreshWorker(); +} + +static int32_t dnodeCheckRunning(char *dir) { + char filepath[256] = {0}; + snprintf(filepath, sizeof(filepath), "%s/.running", dir); + + FileFd fd = taosOpenFileCreateWriteTrunc(filepath); + if (fd < 0) { + dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); + return -1; + } + + int32_t ret = taosLockFile(fd); + if (ret != 0) { + dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); + taosCloseFile(fd); + return -1; + } + + return 0; +} + +static int32_t dnodeInitDir() { + sprintf(tsMnodeDir, "%s/mnode", tsDataDir); + sprintf(tsVnodeDir, "%s/vnode", tsDataDir); + sprintf(tsDnodeDir, "%s/dnode", tsDataDir); + + if (!taosMkDir(tsDnodeDir)) { + dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); + return -1; + } + + if (!taosMkDir(tsMnodeDir)) { + dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); + return -1; + } + + if (!taosMkDir(tsVnodeDir)) { + dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno)); + return -1; + } + + if (dnodeCheckRunning(tsDnodeDir) != 0) { + return -1; + } + + return 0; +} + +static void dnodeCleanupDir() {} + +int32_t dnodeInit() { + SSteps *steps = taosStepInit(24, dnodeReportStartup); + if (steps == NULL) return -1; + + taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); + taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir); + taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck); + taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); + taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL); + taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); + taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); + taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps); + taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup); + taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup); + taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); + taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg); + + tsDnode.steps = steps; + taosStepExec(tsDnode.steps); + + dnodeSetRunStat(DN_RUN_STAT_RUNNING); + dnodeReportStartupFinished("TDengine", "initialized successfully"); + dInfo("TDengine is initialized successfully"); + + return 0; +} + +void dnodeCleanup() { + if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) { + dnodeSetRunStat(DN_RUN_STAT_STOPPED); + taosStepCleanup(tsDnode.steps); + tsDnode.steps = NULL; + } +} + +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; +// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeMsg.c b/source/server/dnode/src/dnodeMsg.c similarity index 99% rename from source/dnode/mgmt/src/dnodeMsg.c rename to source/server/dnode/src/dnodeMsg.c index a5c3db5e14..0a598527b4 100644 --- a/source/dnode/mgmt/src/dnodeMsg.c +++ b/source/server/dnode/src/dnodeMsg.c @@ -14,8 +14,8 @@ */ #define _DEFAULT_SOURCE -#include "dnodeMsg.h" -#include "dnodeEps.h" +#include "dnodeDnode.h" +#include "dnodeConfig.h" #include "mnode.h" #include "tthread.h" #include "ttime.h" diff --git a/source/dnode/mgmt/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c similarity index 99% rename from source/dnode/mgmt/src/dnodeTrans.c rename to source/server/dnode/src/dnodeTrans.c index 3e32510df9..a9e130cd58 100644 --- a/source/dnode/mgmt/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -20,9 +20,9 @@ */ #define _DEFAULT_SOURCE -#include "dnodeTrans.h" -#include "dnodeEps.h" -#include "dnodeMsg.h" +#include "dnodeTransport.h" +#include "dnodeConfig.h" +#include "dnodeDnode.h" #include "mnode.h" #include "vnode.h" -- GitLab