Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d0a4c4ad
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d0a4c4ad
编写于
11月 02, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
11月 02, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #8538 from taosdata/feature/dnode3
Feature/dnode3
上级
85e321f0
cb539ad5
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
455 addition
and
472 deletion
+455
-472
include/common/taosmsg.h
include/common/taosmsg.h
+62
-81
include/common/tglobal.h
include/common/tglobal.h
+1
-1
include/server/vnode/vnode.h
include/server/vnode/vnode.h
+0
-4
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-5
source/dnode/mgmt/inc/dnodeDnode.h
source/dnode/mgmt/inc/dnodeDnode.h
+5
-14
source/dnode/mgmt/inc/dnodeInt.h
source/dnode/mgmt/inc/dnodeInt.h
+1
-0
source/dnode/mgmt/inc/dnodeMnode.h
source/dnode/mgmt/inc/dnodeMnode.h
+1
-0
source/dnode/mgmt/inc/dnodeVnodes.h
source/dnode/mgmt/inc/dnodeVnodes.h
+2
-0
source/dnode/mgmt/src/dnodeDnode.c
source/dnode/mgmt/src/dnodeDnode.c
+273
-266
source/dnode/mgmt/src/dnodeMnode.c
source/dnode/mgmt/src/dnodeMnode.c
+8
-1
source/dnode/mgmt/src/dnodeTransport.c
source/dnode/mgmt/src/dnodeTransport.c
+84
-91
source/dnode/mgmt/src/dnodeVnodes.c
source/dnode/mgmt/src/dnodeVnodes.c
+5
-1
source/dnode/mnode/inc/mnodeDef.h
source/dnode/mnode/inc/mnodeDef.h
+1
-1
source/dnode/mnode/src/mnodeWorker.c
source/dnode/mnode/src/mnodeWorker.c
+6
-6
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+1
-1
未找到文件。
include/common/taosmsg.h
浏览文件 @
d0a4c4ad
...
...
@@ -65,30 +65,6 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY8
,
"dummy8"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY9
,
"dummy9"
)
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_STABLE_IN
,
"create-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STABLE_IN
,
"alter-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STABLE_IN
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_VNODE_IN
,
"create-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_VNODE_IN
,
"alter-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_VNODE_IN
,
"drop-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SYNC_VNODE_IN
,
"sync-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_COMPACT_VNODE_IN
,
"compact-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_MNODE_IN
,
"create-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_MNODE_IN
,
"drop-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONFIG_DNODE_IN
,
"config-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY10
,
"dummy10"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY11
,
"dummy11"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY12
,
"dummy12"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY13
,
"dummy13"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY14
,
"dummy14"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY15
,
"dummy15"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY16
,
"dummy16"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY17
,
"dummy17"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY18
,
"dummy18"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY19
,
"dummy19"
)
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONNECT
,
"connect"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ACCT
,
"create-acct"
)
...
...
@@ -121,6 +97,29 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SHOW_RETRIEVE
,
"retrieve"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC
,
"retrieve-func"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_COMPACT_VNODE
,
"compact-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY10
,
"dummy10"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY11
,
"dummy11"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY12
,
"dummy12"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY13
,
"dummy13"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY14
,
"dummy14"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY15
,
"dummy15"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY16
,
"dummy16"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY17
,
"dummy17"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY18
,
"dummy18"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY19
,
"dummy19"
)
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_STABLE_IN
,
"create-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STABLE_IN
,
"alter-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STABLE_IN
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_VNODE_IN
,
"create-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_VNODE_IN
,
"alter-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_VNODE_IN
,
"drop-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SYNC_VNODE_IN
,
"sync-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_COMPACT_VNODE_IN
,
"compact-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_MNODE_IN
,
"create-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_MNODE_IN
,
"drop-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONFIG_DNODE_IN
,
"config-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY20
,
"dummy20"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY21
,
"dummy21"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY22
,
"dummy22"
)
...
...
@@ -133,9 +132,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY29
,
"dummy29"
)
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
DM_
STATUS
,
"status"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
DM_
GRANT
,
"grant"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
DM_
AUTH
,
"auth"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_STATUS
,
"status"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_GRANT
,
"grant"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_AUTH
,
"auth"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY30
,
"dummy30"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY31
,
"dummy31"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY32
,
"dummy32"
)
...
...
@@ -585,20 +584,6 @@ typedef struct SRetrieveTableRsp {
char
data
[];
}
SRetrieveTableRsp
;
typedef
struct
{
int32_t
vgId
;
int32_t
dbCfgVersion
;
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
uint64_t
vnodeVersion
;
int32_t
vgCfgVersion
;
uint8_t
status
;
uint8_t
role
;
uint8_t
replica
;
uint8_t
compact
;
}
SVnodeLoad
;
typedef
struct
{
char
db
[
TSDB_ACCT_ID_LEN
+
TSDB_DB_NAME_LEN
];
int32_t
cacheBlockSize
;
//MB
...
...
@@ -665,28 +650,47 @@ typedef struct {
uint8_t
ignoreNotExists
;
}
SDropDbMsg
,
SUseDbMsg
,
SSyncDbMsg
;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE
typedef
struct
{
int64_t
pointsWritten
;
// In unit of points
int64_t
totalStorage
;
// In unit of bytes
int64_t
compStorage
;
// In unit of bytes
int64_t
queryTime
;
// In unit of second ??
char
reserved
[
64
];
}
SVnodeStatisticInfo
;
int32_t
statusInterval
;
int8_t
reserved
[
4
];
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
timezone
[
TSDB_TIMEZONE_LEN
];
// tsTimezone
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
}
SClusterCfg
;
typedef
struct
SVgroupAccess
{
int32_t
vgId
;
int8_t
accessState
;
}
SVgroupAccess
;
typedef
struct
{
int32_t
vgId
;
int8_t
status
;
int8_t
role
;
int8_t
reserved
[
2
];
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
int64_t
tablesNum
;
}
SVnodeLoad
;
typedef
struct
{
int32_t
vnodeNum
;
SVnodeLoad
vnodeLoads
[];
}
SVnodeLoads
;
typedef
struct
SStatusMsg
{
uint32_t
sversion
;
int32_t
dnodeId
;
int64_t
clusterId
;
uint32_t
rebootTime
;
// time stamp for last reboot
int32_t
numOfCores
;
char
dnodeEp
[
TSDB_EP_LEN
];
SClusterCfg
clusterCfg
;
SVnodeLoads
vnodeLoads
;
}
SStatusMsg
;
typedef
struct
{
int32_t
dnodeId
;
int8_t
dropped
;
char
reserved
[
19
];
char
reserved
[
3
];
int64_t
clusterId
;
int32_t
numOfDnodes
;
int32_t
numOfVnodes
;
}
SDnodeCfg
;
typedef
struct
{
...
...
@@ -703,31 +707,8 @@ typedef struct {
}
SDnodeEps
;
typedef
struct
{
int32_t
statusInterval
;
// tsStatusInterval
int8_t
reserved
[
36
];
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
timezone
[
64
];
// tsTimezone
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
}
SClusterCfg
;
typedef
struct
SStatusMsg
{
uint32_t
version
;
int32_t
dnodeId
;
uint32_t
lastReboot
;
// time stamp for last reboot
int32_t
openVnodes
;
int32_t
numOfCores
;
float
diskAvailable
;
int8_t
reserved
[
36
];
char
dnodeEp
[
TSDB_EP_LEN
];
int64_t
clusterId
;
SClusterCfg
clusterCfg
;
SVnodeLoad
load
[];
}
SStatusMsg
;
typedef
struct
{
SDnodeCfg
dnodeCfg
;
SVgroupAccess
vgAccess
[];
SDnodeCfg
dnodeCfg
;
SDnodeEps
dnodeEps
;
}
SStatusRsp
;
typedef
struct
{
...
...
include/common/tglobal.h
浏览文件 @
d0a4c4ad
...
...
@@ -198,7 +198,7 @@ extern SDiskCfg tsDiskCfg[];
void
taosInitGlobalCfg
();
int32_t
taosCheckGlobalCfg
();
bool
taosCfgDynamicOptions
(
char
*
msg
);
int32_t
taosCfgDynamicOptions
(
char
*
msg
);
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
bool
taosCheckBalanceCfgOptions
(
const
char
*
option
,
int32_t
*
vnodeId
,
int32_t
*
dnodeId
);
void
taosAddDataDir
(
int
index
,
char
*
v1
,
int
level
,
int
primary
);
...
...
include/server/vnode/vnode.h
浏览文件 @
d0a4c4ad
...
...
@@ -58,10 +58,6 @@ typedef struct {
int8_t
syncRole
;
}
SVnodeStatus
;
typedef
struct
{
int32_t
accessState
;
}
SVnodeAccess
;
typedef
struct
SVnodeMsg
{
int32_t
msgType
;
int32_t
code
;
...
...
source/common/src/tglobal.c
浏览文件 @
d0a4c4ad
...
...
@@ -277,13 +277,13 @@ void taosSetAllDebugFlag() {
}
}
bool
taosCfgDynamicOptions
(
char
*
msg
)
{
int32_t
taosCfgDynamicOptions
(
char
*
msg
)
{
char
*
option
,
*
value
;
int32_t
olen
,
vlen
;
int32_t
vint
=
0
;
paGetToken
(
msg
,
&
option
,
&
olen
);
if
(
olen
==
0
)
return
false
;;
if
(
olen
==
0
)
return
-
1
;;
paGetToken
(
option
+
olen
+
1
,
&
value
,
&
vlen
);
if
(
vlen
==
0
)
...
...
@@ -324,18 +324,18 @@ bool taosCfgDynamicOptions(char *msg) {
uError
(
"monitor can't be updated, for monitor not initialized"
);
}
}
return
true
;
return
0
;
}
if
(
strncasecmp
(
cfg
->
option
,
"debugFlag"
,
olen
)
==
0
)
{
taosSetAllDebugFlag
();
}
return
true
;
return
0
;
}
if
(
strncasecmp
(
option
,
"resetlog"
,
8
)
==
0
)
{
taosResetLog
();
taosPrintGlobalCfg
();
return
true
;
return
0
;
}
if
(
strncasecmp
(
option
,
"resetQueryCache"
,
15
)
==
0
)
{
...
...
source/dnode/mgmt/inc/dnodeDnode.h
浏览文件 @
d0a4c4ad
...
...
@@ -23,23 +23,14 @@ extern "C" {
int32_t
dnodeInitDnode
();
void
dnodeCleanupDnode
();
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
);
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessDnodeMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
);
int32_t
dnodeInitConfig
();
void
dnodeCleanupConfig
();
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
);
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
);
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
pEpSet
);
int32_t
dnodeGetDnodeId
();
int64_t
dnodeGetClusterId
();
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
void
dnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/inc/dnodeInt.h
浏览文件 @
d0a4c4ad
...
...
@@ -35,6 +35,7 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef
enum
{
DN_RUN_STAT_INIT
,
DN_RUN_STAT_RUNNING
,
DN_RUN_STAT_STOPPED
}
EDnStat
;
typedef
void
(
*
MsgFp
)(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
);
int32_t
dnodeInit
();
void
dnodeCleanup
();
...
...
source/dnode/mgmt/inc/dnodeMnode.h
浏览文件 @
d0a4c4ad
...
...
@@ -23,6 +23,7 @@ extern "C" {
int32_t
dnodeInitMnode
();
void
dnodeCleanupMnode
();
void
dnodeProcessMnodeMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
);
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
);
...
...
source/dnode/mgmt/inc/dnodeVnodes.h
浏览文件 @
d0a4c4ad
...
...
@@ -23,6 +23,8 @@ extern "C" {
int32_t
dnodeInitVnodes
();
void
dnodeCleanupVnodes
();
void
dnodeProcessVnodesMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
);
void
dnodeGetVnodes
(
SVnodeLoads
*
pVloads
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/src/dnodeDnode.c
浏览文件 @
d0a4c4ad
...
...
@@ -16,70 +16,83 @@
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "tthread.h"
#include "ttime.h"
#include "dnodeVnodes.h"
#include "cJSON.h"
#include "thash.h"
#include "tthread.h"
#include "ttime.h"
static
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
SDnodeEps
*
dnodeEps
;
SHashObj
*
dnodeHash
;
SRpcEpSet
mnodeEpSetForShell
;
SRpcEpSet
mnodeEpSetForPeer
;
char
file
[
PATH_MAX
+
20
];
uint32_t
rebootTime
;
int8_t
dropped
;
int8_t
threadStop
;
pthread_t
*
threadId
;
pthread_mutex_t
mutex
;
}
tsConfig
;
MsgFp
msgFp
[
TSDB_MSG_TYPE_MAX
];
}
tsDnode
=
{
0
};
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForPeer
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
dnodeId
=
tsDnode
.
dnodeId
;
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
return
dnodeId
;
}
static
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForShell
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
int64_t
dnodeGetClusterId
()
{
int64_t
clusterId
=
0
;
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
clusterId
=
tsDnode
.
clusterId
;
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
return
clusterId
;
}
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
ep
)
{
if
(
ep
!=
NULL
||
ep
->
numOfEps
<=
0
)
{
dError
(
"mnode is changed, but content is invalid, discard it"
);
return
;
}
void
dnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
)
{
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
pEp
=
taosHashGet
(
tsDnode
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pEp
!=
NULL
)
{
if
(
port
)
*
port
=
pEp
->
dnodePort
;
if
(
fqdn
)
tstrncpy
(
fqdn
,
pEp
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
if
(
ep
)
snprintf
(
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
pEp
->
dnodeFqdn
,
pEp
->
dnodePort
);
}
dInfo
(
"mnode is changed, num:%d use:%d"
,
ep
->
numOfEps
,
ep
->
inUse
);
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
tsConfig
.
mnodeEpSetForPeer
=
*
ep
;
for
(
int32_t
i
=
0
;
i
<
ep
->
numOfEps
;
++
i
)
{
ep
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"mnode index:%d %s:%u"
,
i
,
ep
->
fqdn
[
i
],
ep
->
port
[
i
]);
}
tsConfig
.
mnodeEpSetForShell
=
*
ep
;
void
dnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
pEpSet
)
{
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
*
pEpSet
=
tsDnode
.
mnodeEpSetForPeer
;
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
void
dnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
pEpSet
)
{
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
*
pEpSet
=
tsDnode
.
mnodeEpSetForShell
;
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
SRpcConnInfo
connInfo
=
{
0
};
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
pMsg
,
bool
forShell
)
{
int32_t
msgType
=
pMsg
->
msgType
;
SRpcEpSet
epSet
=
{
0
};
if
(
forShell
)
{
dnodeGetEpSetForShell
(
&
epSet
);
dnodeGet
Mnode
EpSetForShell
(
&
epSet
);
}
else
{
dnodeGetEpSetForPeer
(
&
epSet
);
dnodeGet
Mnode
EpSetForPeer
(
&
epSet
);
}
dDebug
(
"
msg:%s will be redirected, num:%d use:%d"
,
taosMsg
[
rpcMsg
->
msgType
],
epSet
.
numOfEps
,
epSet
.
inUse
);
dDebug
(
"
RPC %p, msg:%s is redirected, num:%d use:%d"
,
pMsg
->
handle
,
taosMsg
[
msgType
],
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%
d
"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
dDebug
(
"mnode index:%d %s:%
u
"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
tsLocalFqdn
)
==
0
)
{
if
((
epSet
.
port
[
i
]
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
&&
!
forShell
)
||
(
epSet
.
port
[
i
]
==
tsServerPort
&&
forShell
))
{
...
...
@@ -91,71 +104,88 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]);
}
rpcSendRedirectRsp
(
rpcMsg
->
handle
,
&
epSet
);
rpcSendRedirectRsp
(
pMsg
->
handle
,
&
epSet
);
}
static
void
dnodeUpdateMnodeEpSet
(
SRpcEpSet
*
pEpSet
)
{
if
(
pEpSet
==
NULL
||
pEpSet
->
numOfEps
<=
0
)
{
dError
(
"mnode is changed, but content is invalid, discard it"
);
return
;
}
else
{
dInfo
(
"mnode is changed, num:%d use:%d"
,
pEpSet
->
numOfEps
,
pEpSet
->
inUse
);
}
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
tsDnode
.
mnodeEpSetForPeer
=
*
pEpSet
;
for
(
int32_t
i
=
0
;
i
<
pEpSet
->
numOfEps
;
++
i
)
{
pEpSet
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"mnode index:%d %s:%u"
,
i
,
pEpSet
->
fqdn
[
i
],
pEpSet
->
port
[
i
]);
}
tsDnode
.
mnodeEpSetForShell
=
*
pEpSet
;
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
static
void
dnodePrintEps
()
{
dDebug
(
"print dnode
list, num:%d"
,
tsConfig
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
ts
Config
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
ts
Config
.
dnodeEps
->
dnodeEps
[
i
];
dDebug
(
"print dnode
endpoint list, num:%d"
,
tsDnode
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
ts
Dnode
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
ts
Dnode
.
dnodeEps
->
dnodeEps
[
i
];
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
ep
->
dnodeId
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
,
ep
->
isMnode
);
}
}
static
void
dnodeResetEps
(
SDnodeEps
*
data
)
{
assert
(
data
!=
NULL
);
int32_t
size
=
sizeof
(
SDnodeEps
)
+
data
->
dnodeNum
*
sizeof
(
SDnodeEp
);
static
void
dnodeResetEps
(
SDnodeEps
*
pEps
)
{
assert
(
pEps
!=
NULL
);
int32_t
size
=
sizeof
(
SDnodeEps
)
+
pEps
->
dnodeNum
*
sizeof
(
SDnodeEp
);
if
(
data
->
dnodeNum
>
tsConfig
.
dnodeEps
->
dnodeNum
)
{
if
(
pEps
->
dnodeNum
>
tsDnode
.
dnodeEps
->
dnodeNum
)
{
SDnodeEps
*
tmp
=
calloc
(
1
,
size
);
if
(
tmp
==
NULL
)
return
;
tfree
(
ts
Config
.
dnodeEps
);
ts
Config
.
dnodeEps
=
tmp
;
tfree
(
ts
Dnode
.
dnodeEps
);
ts
Dnode
.
dnodeEps
=
tmp
;
}
if
(
ts
Config
.
dnodeEps
!=
data
)
{
memcpy
(
ts
Config
.
dnodeEps
,
data
,
size
);
if
(
ts
Dnode
.
dnodeEps
!=
pEps
)
{
memcpy
(
ts
Dnode
.
dnodeEps
,
pEps
,
size
);
}
tsConfig
.
mnodeEpSetForPeer
.
inUse
=
0
;
tsConfig
.
mnodeEpSetForShell
.
inUse
=
0
;
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
tsDnode
.
mnodeEpSetForPeer
.
inUse
=
0
;
tsDnode
.
mnodeEpSetForShell
.
inUse
=
0
;
int32_t
mIndex
=
0
;
for
(
int32_t
i
=
0
;
i
<
tsDnode
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsDnode
.
dnodeEps
->
dnodeEps
[
i
];
if
(
!
ep
->
isMnode
)
continue
;
if
(
i
ndex
>=
TSDB_MAX_REPLICA
)
continue
;
strcpy
(
ts
Config
.
mnodeEpSetForShell
.
fqdn
[
i
ndex
],
ep
->
dnodeFqdn
);
strcpy
(
ts
Config
.
mnodeEpSetForPeer
.
fqdn
[
i
ndex
],
ep
->
dnodeFqdn
);
ts
Config
.
mnodeEpSetForShell
.
port
[
i
ndex
]
=
ep
->
dnodePort
;
ts
Config
.
mnodeEpSetForShell
.
port
[
i
ndex
]
=
ep
->
dnodePort
+
tsDnodeDnodePort
;
i
ndex
++
;
if
(
mI
ndex
>=
TSDB_MAX_REPLICA
)
continue
;
strcpy
(
ts
Dnode
.
mnodeEpSetForShell
.
fqdn
[
mI
ndex
],
ep
->
dnodeFqdn
);
strcpy
(
ts
Dnode
.
mnodeEpSetForPeer
.
fqdn
[
mI
ndex
],
ep
->
dnodeFqdn
);
ts
Dnode
.
mnodeEpSetForShell
.
port
[
mI
ndex
]
=
ep
->
dnodePort
;
ts
Dnode
.
mnodeEpSetForShell
.
port
[
mI
ndex
]
=
ep
->
dnodePort
+
tsDnodeDnodePort
;
mI
ndex
++
;
}
for
(
int32_t
i
=
0
;
i
<
ts
Config
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
ts
Config
.
dnodeEps
->
dnodeEps
[
i
];
taosHashPut
(
ts
Config
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
for
(
int32_t
i
=
0
;
i
<
ts
Dnode
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
ts
Dnode
.
dnodeEps
->
dnodeEps
[
i
];
taosHashPut
(
ts
Dnode
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
}
dnodePrintEps
();
}
static
bool
dnodeIs
DnodeEpChanged
(
int32_t
dnodeId
,
char
*
eps
tr
)
{
static
bool
dnodeIs
EpChanged
(
int32_t
dnodeId
,
char
*
epS
tr
)
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
SDnodeEp
*
pEp
=
taosHashGet
(
tsDnode
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pEp
!=
NULL
)
{
char
epSaved
[
TSDB_EP_LEN
+
1
];
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
changed
=
strcmp
(
epstr
,
epSaved
)
!=
0
;
tstrncpy
(
epstr
,
epSaved
,
TSDB_EP_LEN
);
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
pEp
->
dnodeFqdn
,
pEp
->
dnodePort
);
changed
=
strcmp
(
epStr
,
epSaved
)
!=
0
;
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
return
changed
;
}
...
...
@@ -166,101 +196,101 @@ static int32_t dnodeReadEps() {
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
fp
=
fopen
(
ts
Config
.
file
,
"r"
);
fp
=
fopen
(
ts
Dnode
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
ts
Config
.
file
);
dDebug
(
"file %s not exist"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
ts
Config
.
file
);
dError
(
"failed to read %s since content is null"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
ts
Config
.
file
);
dError
(
"failed to read %s since invalid json format"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dnodeId not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s since dnodeId not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
ts
Config
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
ts
Dnode
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"droppe
d"
);
if
(
!
dropped
||
droppe
d
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since
dropped not found"
,
tsConfig
.
file
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterI
d"
);
if
(
!
clusterId
||
clusterI
d
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since
clusterId not found"
,
tsDnode
.
file
);
goto
PRASE_EPS_OVER
;
}
ts
Config
.
dropped
=
atoi
(
droppe
d
->
valuestring
);
ts
Dnode
.
clusterId
=
atoll
(
clusterI
d
->
valuestring
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterI
d"
);
if
(
!
clusterId
||
clusterI
d
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since
clusterId not found"
,
tsConfig
.
file
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"droppe
d"
);
if
(
!
dropped
||
droppe
d
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since
dropped not found"
,
tsDnode
.
file
);
goto
PRASE_EPS_OVER
;
}
ts
Config
.
clusterId
=
atoll
(
clusterI
d
->
valuestring
);
ts
Dnode
.
dropped
=
atoi
(
droppe
d
->
valuestring
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodeInfos not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s since dnodeInfos not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
if
(
dnodeInfosSize
<=
0
)
{
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
ts
Config
.
file
,
dnodeInfosSize
);
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
ts
Dnode
.
file
,
dnodeInfosSize
);
goto
PRASE_EPS_OVER
;
}
ts
Config
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
if
(
ts
Config
.
dnodeEps
==
NULL
)
{
ts
Dnode
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
if
(
ts
Dnode
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
goto
PRASE_EPS_OVER
;
}
ts
Config
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
ts
Dnode
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
for
(
int32_t
i
=
0
;
i
<
dnodeInfosSize
;
++
i
)
{
cJSON
*
dnodeInfo
=
cJSON_GetArrayItem
(
dnodeInfos
,
i
);
if
(
dnodeInfo
==
NULL
)
break
;
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
SDnodeEp
*
pEp
=
&
tsDnode
.
dnodeEps
->
dnodeEps
[
i
];
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodeId not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s, dnodeId not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
e
p
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
pE
p
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
dnodeInfo
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, isMnode not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s, isMnode not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
e
p
->
isMnode
=
atoi
(
isMnode
->
valuestring
);
pE
p
->
isMnode
=
atoi
(
isMnode
->
valuestring
);
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s, dnodeFqdn not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s, dnodeFqdn not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
tstrncpy
(
e
p
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
tstrncpy
(
pE
p
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodePort not found"
,
ts
Config
.
file
);
dError
(
"failed to read %s, dnodePort not found"
,
ts
Dnode
.
file
);
goto
PRASE_EPS_OVER
;
}
e
p
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
);
pE
p
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
);
}
dInfo
(
"succcessed to read file %s"
,
ts
Config
.
file
);
dInfo
(
"succcessed to read file %s"
,
ts
Dnode
.
file
);
dnodePrintEps
();
PRASE_EPS_OVER:
...
...
@@ -268,21 +298,21 @@ PRASE_EPS_OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
dnodeIs
DnodeEpChanged
(
tsConfig
.
dnodeId
,
tsLocalEp
))
{
dError
(
"
dnode:%d, localEp %s different with dnodeEps.json and need reconfigured"
,
tsConfig
.
dnodeId
,
tsLocalEp
);
if
(
dnodeIs
EpChanged
(
tsDnode
.
dnodeId
,
tsLocalEp
))
{
dError
(
"
localEp %s different with %s and need reconfigured"
,
tsLocalEp
,
tsDnode
.
file
);
return
-
1
;
}
dnodeResetEps
(
ts
Config
.
dnodeEps
);
dnodeResetEps
(
ts
Dnode
.
dnodeEps
);
terrno
=
0
;
return
0
;
}
static
int32_t
dnodeWriteEps
()
{
FILE
*
fp
=
fopen
(
ts
Config
.
file
,
"w"
);
FILE
*
fp
=
fopen
(
ts
Dnode
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
ts
Config
.
file
,
strerror
(
errno
));
dError
(
"failed to write %s since %s"
,
ts
Dnode
.
file
,
strerror
(
errno
));
return
-
1
;
}
...
...
@@ -291,17 +321,17 @@ static int32_t dnodeWriteEps() {
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ts
Config
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsConfig
.
droppe
d
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
tsConfig
.
clusterI
d
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ts
Dnode
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
tsDnode
.
clusterI
d
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsDnode
.
droppe
d
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeInfos
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
ts
Config
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
ts
Config
.
dnodeEps
->
dnodeEps
[
i
];
for
(
int32_t
i
=
0
;
i
<
ts
Dnode
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
ts
Dnode
.
dnodeEps
->
dnodeEps
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ep
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
isMnode
\"
:
\"
%d
\"
,
\n
"
,
ep
->
isMnode
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeFqdn
\"
:
\"
%s
\"
,
\n
"
,
ep
->
dnodeFqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodePort
\"
:
\"
%u
\"\n
"
,
ep
->
dnodePort
);
if
(
i
<
ts
Config
.
dnodeEps
->
dnodeNum
-
1
)
{
if
(
i
<
ts
Dnode
.
dnodeEps
->
dnodeNum
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
...
...
@@ -315,150 +345,76 @@ static int32_t dnodeWriteEps() {
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
ts
Config
.
file
);
dInfo
(
"successed to write %s"
,
ts
Dnode
.
file
);
return
0
;
}
int32_t
dnodeInitConfig
()
{
tsConfig
.
dnodeId
=
0
;
tsConfig
.
dropped
=
0
;
tsConfig
.
clusterId
=
0
;
tsConfig
.
dnodeEps
=
NULL
;
snprintf
(
tsConfig
.
file
,
sizeof
(
tsConfig
.
file
),
"%s/dnodeEps.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsConfig
.
mutex
,
NULL
);
tsConfig
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsConfig
.
dnodeHash
==
NULL
)
return
-
1
;
int32_t
ret
=
dnodeReadEps
();
if
(
ret
==
0
)
{
dInfo
(
"dnode eps is initialized"
);
}
return
ret
;
}
void
dnodeCleanupConfig
()
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
tsConfig
.
dnodeEps
!=
NULL
)
{
free
(
tsConfig
.
dnodeEps
);
tsConfig
.
dnodeEps
=
NULL
;
}
if
(
tsConfig
.
dnodeHash
)
{
taosHashCleanup
(
tsConfig
.
dnodeHash
);
tsConfig
.
dnodeHash
=
NULL
;
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
pthread_mutex_destroy
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
)
{
if
(
data
==
NULL
||
data
->
dnodeNum
<=
0
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
data
->
dnodeNum
!=
tsConfig
.
dnodeEps
->
dnodeNum
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
else
{
int32_t
size
=
data
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
);
if
(
memcmp
(
tsConfig
.
dnodeEps
,
data
,
size
)
!=
0
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
if
(
port
)
*
port
=
ep
->
dnodePort
;
if
(
fqdn
)
tstrncpy
(
fqdn
,
ep
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
if
(
epstr
)
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
)
{
if
(
tsConfig
.
dnodeId
!=
0
&&
!
data
->
dropped
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
tsConfig
.
dnodeId
=
data
->
dnodeId
;
tsConfig
.
clusterId
=
data
->
clusterId
;
tsConfig
.
dropped
=
data
->
dropped
;
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
data
->
dnodeId
,
data
->
clusterId
);
dnodeWriteEps
();
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
dnodeId
=
tsConfig
.
dnodeId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
dnodeId
;
}
int64_t
dnodeGetClusterId
()
{
int64_t
clusterId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
clusterId
=
tsConfig
.
clusterId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
clusterId
;
}
static
struct
{
pthread_t
*
threadId
;
bool
threadStop
;
uint32_t
rebootTime
;
}
tsDnode
;
static
void
dnodeSendStatusMsg
()
{
int32_t
contLen
=
sizeof
(
SStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
int32_t
contLen
=
sizeof
(
SStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
SStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
if
(
pStatus
==
NULL
)
{
dError
(
"failed to malloc status message"
);
return
;
}
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
s
version
=
htonl
(
tsVersion
);
pStatus
->
dnodeId
=
htonl
(
dnodeGetDnodeId
());
tstrncpy
(
pStatus
->
dnodeEp
,
tsLocalEp
,
TSDB_EP_LEN
);
pStatus
->
clusterId
=
htobe64
(
dnodeGetClusterId
());
pStatus
->
lastReboot
=
htonl
(
tsDnode
.
rebootTime
);
pStatus
->
rebootTime
=
htonl
(
tsDnode
.
rebootTime
);
pStatus
->
numOfCores
=
htonl
(
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
tstrncpy
(
pStatus
->
dnodeEp
,
tsLocalEp
,
TSDB_EP_LEN
)
;
// fill cluster cfg parameters
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
tsStatusInterval
);
pStatus
->
clusterCfg
.
checkTime
=
0
;
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
tsTimezone
,
64
);
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
tsTimezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
// vnodeGetStatus(NULL, pStatus);
// contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
// pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg
rpcMsg
=
{.
ahandle
=
NULL
,
.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TSDB_MSG_TYPE_DM_STATUS
};
dnodeGetVnodes
(
&
pStatus
->
vnodeLoads
);
contLen
=
sizeof
(
SStatusMsg
)
+
pStatus
->
vnodeLoads
.
vnodeNum
*
sizeof
(
SVnodeLoad
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TSDB_MSG_TYPE_STATUS
};
dnodeSendMsgToMnode
(
&
rpcMsg
);
}
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
dTrace
(
"status rsp is received, code:%s"
,
tstrerror
(
pMsg
->
code
));
static
void
dnodeUpdateCfg
(
SDnodeCfg
*
pCfg
)
{
if
(
tsDnode
.
dnodeId
==
0
)
return
;
if
(
tsDnode
.
dropped
)
return
;
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
tsDnode
.
dnodeId
=
pCfg
->
dnodeId
;
tsDnode
.
clusterId
=
pCfg
->
clusterId
;
tsDnode
.
dropped
=
pCfg
->
dropped
;
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
dnodeWriteEps
();
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
static
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
pEps
)
{
if
(
pEps
==
NULL
||
pEps
->
dnodeNum
<=
0
)
return
;
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
if
(
pEps
->
dnodeNum
!=
tsDnode
.
dnodeEps
->
dnodeNum
)
{
dnodeResetEps
(
pEps
);
dnodeWriteEps
();
}
else
{
int32_t
size
=
pEps
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
);
if
(
memcmp
(
tsDnode
.
dnodeEps
,
pEps
,
size
)
!=
0
)
{
dnodeResetEps
(
pEps
);
dnodeWriteEps
();
}
}
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
return
;
SStatusRsp
*
pStatusRsp
=
pMsg
->
pCont
;
...
...
@@ -466,25 +422,40 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
SDnodeCfg
*
pCfg
=
&
pStatusRsp
->
dnodeCfg
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
clusterId
=
htobe64
(
pCfg
->
clusterId
);
pCfg
->
numOfVnodes
=
htonl
(
pCfg
->
numOfVnodes
);
pCfg
->
numOfDnodes
=
htonl
(
pCfg
->
numOfDnodes
);
dnodeUpdateCfg
(
pCfg
);
if
(
pCfg
->
dropped
)
{
dError
(
"status rsp is received, and set dnode to drop status"
);
return
;
if
(
pCfg
->
dropped
)
return
;
SDnodeEps
*
pEps
=
&
pStatusRsp
->
dnodeEps
;
pEps
->
dnodeNum
=
htonl
(
pEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pEps
->
dnodeNum
;
++
i
)
{
pEps
->
dnodeEps
[
i
].
dnodeId
=
htonl
(
pEps
->
dnodeEps
[
i
].
dnodeId
);
pEps
->
dnodeEps
[
i
].
dnodePort
=
htons
(
pEps
->
dnodeEps
[
i
].
dnodePort
);
}
// vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
dnodeUpdateDnodeEps
(
pEps
);
}
SDnodeEps
*
eps
=
(
SDnodeEps
*
)((
char
*
)
pStatusRsp
->
vgAccess
+
pCfg
->
numOfVnodes
*
sizeof
(
SVgroupAccess
));
eps
->
dnodeNum
=
htonl
(
eps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
eps
->
dnodeNum
;
++
i
)
{
eps
->
dnodeEps
[
i
].
dnodeId
=
htonl
(
eps
->
dnodeEps
[
i
].
dnodeId
);
eps
->
dnodeEps
[
i
].
dnodePort
=
htons
(
eps
->
dnodeEps
[
i
].
dnodePort
);
}
static
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
)
{
SCfgDnodeMsg
*
pCfg
=
pMsg
->
pCont
;
dnodeUpdateDnodeEps
(
eps
);
int32_t
code
=
taosCfgDynamicOptions
(
pCfg
->
config
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
)
{
dInfo
(
"startup msg is received, cont:%s"
,
(
char
*
)
pMsg
->
pCont
);
SStartupStep
*
pStep
=
rpcMallocCont
(
sizeof
(
SStartupStep
));
dnodeGetStartup
(
pStep
);
dInfo
(
"startup msg is sent, step:%s desc:%s finished:%d"
,
pStep
->
name
,
pStep
->
desc
,
pStep
->
finished
);
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
pStep
,
.
contLen
=
sizeof
(
SStartupStep
)};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
void
*
dnodeThreadRoutine
(
void
*
param
)
{
...
...
@@ -496,14 +467,34 @@ static void *dnodeThreadRoutine(void *param) {
}
int32_t
dnodeInitDnode
()
{
tsDnode
.
threadStop
=
false
;
tsDnode
.
dnodeId
=
0
;
tsDnode
.
clusterId
=
0
;
tsDnode
.
dnodeEps
=
NULL
;
snprintf
(
tsDnode
.
file
,
sizeof
(
tsDnode
.
file
),
"%s/dnode.json"
,
tsDnodeDir
);
tsDnode
.
rebootTime
=
taosGetTimestampSec
();
tsDnode
.
dropped
=
0
;
pthread_mutex_init
(
&
tsDnode
.
mutex
,
NULL
);
tsDnode
.
threadStop
=
false
;
tsDnode
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsDnode
.
dnodeHash
==
NULL
)
{
dError
(
"failed to init dnode hash"
);
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
tsDnode
.
threadId
=
taosCreateThread
(
dnodeThreadRoutine
,
NULL
);
if
(
tsDnode
.
threadId
==
NULL
)
{
return
-
1
;
dError
(
"failed to init dnode thread"
);
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
int32_t
code
=
dnodeReadEps
();
if
(
code
!=
0
)
{
dError
(
"failed to read dnode endpoint file since %s"
,
tstrerror
(
code
));
return
code
;
}
dInfo
(
"dnode
msg
is initialized"
);
dInfo
(
"dnode
-dnode
is initialized"
);
return
0
;
}
...
...
@@ -514,29 +505,45 @@ void dnodeCleanupDnode() {
tsDnode
.
threadId
=
NULL
;
}
dInfo
(
"dnode msg is cleanuped"
);
}
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
)
{
SCfgDnodeMsg
*
pCfg
=
pMsg
->
pCont
;
if
(
tsDnode
.
dnodeEps
!=
NULL
)
{
free
(
tsDnode
.
dnodeEps
);
tsDnode
.
dnodeEps
=
NULL
;
}
int32_t
code
=
taosCfgDynamicOptions
(
pCfg
->
config
);
if
(
tsDnode
.
dnodeHash
)
{
taosHashCleanup
(
tsDnode
.
dnodeHash
);
tsDnode
.
dnodeHash
=
NULL
;
}
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
pthread_mutex_destroy
(
&
tsDnode
.
mutex
);
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
dInfo
(
"dnode-dnode is cleaned up"
);
}
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
)
{
dInfo
(
"startup msg is received, cont:%s"
,
(
char
*
)
pMsg
->
pCont
);
SStartupStep
*
pStep
=
rpcMallocCont
(
sizeof
(
SStartupStep
));
dnodeGetStartup
(
pStep
);
dDebug
(
"startup msg is sent, step:%s desc:%s finished:%d"
,
pStep
->
name
,
pStep
->
desc
,
pStep
->
finished
);
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
pStep
,
.
contLen
=
sizeof
(
SStartupStep
)};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
void
dnodeProcessDnodeMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
int32_t
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
TSDB_MSG_TYPE_STATUS_RSP
&&
pEpSet
)
{
dnodeUpdateMnodeEpSet
(
pEpSet
);
}
switch
(
msgType
)
{
case
TSDB_MSG_TYPE_NETWORK_TEST
:
dnodeProcessStartupReq
(
pMsg
);
break
;
case
TSDB_MSG_TYPE_CONFIG_DNODE_IN
:
dnodeProcessConfigDnodeReq
(
pMsg
);
break
;
case
TSDB_MSG_TYPE_STATUS_RSP
:
dnodeProcessStatusRsp
(
pMsg
);
break
;
default:
dError
(
"RPC %p, %s not processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
code
=
TSDB_CODE_DND_MSG_NOT_PROCESSED
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
}
source/dnode/mgmt/src/dnodeMnode.c
浏览文件 @
d0a4c4ad
...
...
@@ -21,7 +21,7 @@
int32_t
dnodeInitMnode
()
{
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGetEp
;
para
.
fp
.
GetDnodeEp
=
dnodeGet
Dnode
Ep
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
...
...
@@ -59,4 +59,11 @@ void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessMnodeMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
mnodeProcessMsg
(
pMsg
);
// tsDnode.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
// tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessDropMnodeReq;
}
\ No newline at end of file
source/dnode/mgmt/src/dnodeTransport.c
浏览文件 @
d0a4c4ad
...
...
@@ -26,9 +26,6 @@
#include "dnodeVnodes.h"
#include "mnode.h"
#include "vnode.h"
typedef
void
(
*
MsgFp
)(
SRpcMsg
*
pMsg
);
static
struct
{
void
*
serverRpc
;
void
*
clientRpc
;
...
...
@@ -38,88 +35,88 @@ static struct {
static
void
dnodeInitMsgFp
()
{
// msg from client to dnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_QUERY
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_FETCH
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TABLE
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TABLE
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TABLE
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_TABLE_META
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_TABLES_META
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STABLE_VGROUP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_QUERY
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONSUME
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_DISCONNECT
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_ACK
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_RESET
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_NETWORK_TEST
]
=
dnodeProcess
StartupReq
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_QUERY
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_FETCH
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TABLE
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TABLE
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TABLE
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_TABLE_META
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_TABLES_META
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STABLE_VGROUP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_QUERY
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONSUME
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_DISCONNECT
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_ACK
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_RESET
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_NETWORK_TEST
]
=
dnodeProcess
DnodeMsg
;
// msg from client to mnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONNECT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_USER
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_USER
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_USER
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DB
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DB
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_USE_DB
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_DB
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_DB
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TOPIC
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TOPIC
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TOPIC
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_FUNCTION
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_FUNCTION
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_FUNCTION
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONNECT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_USER
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_USER
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_USER
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DB
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DB
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_USE_DB
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_DB
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_DB
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TOPIC
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TOPIC
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TOPIC
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_FUNCTION
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_FUNCTION
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_FUNCTION
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE
]
=
dnodeProcessMnode
Msg
;
// message from mnode to dnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN
]
=
vnodeProces
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN
]
=
dnodeProcess
CreateMnodeReq
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN
]
=
NULL
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN
]
=
NULL
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN
]
=
dnodeProcessVnode
sMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN
]
=
dnodeProcess
MnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP
]
=
dnodeProcessMnode
Msg
;
// message from dnode to mnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_AUTH
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_AUTH_RSP
]
=
NULL
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_GRANT
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_GRANT_RSP
]
=
NULL
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_STATUS
]
=
mnodeProcess
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DM_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
AUTH
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
AUTH_RSP
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
GRANT
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
GRANT_RSP
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
STATUS
]
=
dnodeProcessMnode
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
STATUS_RSP
]
=
dnodeProcessDnodeMsg
;
}
static
void
dnodeProcessPeerReq
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
...
...
@@ -127,7 +124,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
int32_t
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
TSDB_MSG_TYPE_NETWORK_TEST
)
{
dnodeProcess
StartupReq
(
pMsg
);
dnodeProcess
DnodeMsg
(
pMsg
,
pEpSet
);
return
;
}
...
...
@@ -148,7 +145,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
MsgFp
fp
=
tsTrans
.
msgFp
[
msgType
];
if
(
fp
!=
NULL
)
{
dTrace
(
"RPC %p, peer req:%s will be processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
(
*
fp
)(
pMsg
);
(
*
fp
)(
pMsg
,
pEpSet
);
}
else
{
dError
(
"RPC %p, peer req:%s not processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
rspMsg
.
code
=
TSDB_CODE_DND_MSG_NOT_PROCESSED
;
...
...
@@ -196,14 +193,10 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return
;
}
if
(
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pEpSet
)
{
dnodeUpdateMnodeEps
(
pEpSet
);
}
MsgFp
fp
=
tsTrans
.
msgFp
[
msgType
];
if
(
fp
!=
NULL
)
{
dTrace
(
"RPC %p, peer rsp:%s will be processed
"
,
pMsg
->
handle
,
taosMsg
[
msgType
]
);
(
*
fp
)(
pMsg
);
dTrace
(
"RPC %p, peer rsp:%s will be processed
, code:%s"
,
pMsg
->
handle
,
taosMsg
[
msgType
],
tstrerror
(
pMsg
->
code
)
);
(
*
fp
)(
pMsg
,
pEpSet
);
}
else
{
dDebug
(
"RPC %p, peer rsp:%s not processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
}
...
...
@@ -270,7 +263,7 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
MsgFp
fp
=
tsTrans
.
msgFp
[
msgType
];
if
(
fp
!=
NULL
)
{
dTrace
(
"RPC %p, shell req:%s will be processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
(
*
fp
)(
pMsg
);
(
*
fp
)(
pMsg
,
pEpSet
);
}
else
{
dError
(
"RPC %p, shell req:%s is not processed"
,
pMsg
->
handle
,
taosMsg
[
msgType
]);
rspMsg
.
code
=
TSDB_CODE_DND_MSG_NOT_PROCESSED
;
...
...
@@ -283,13 +276,13 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsT
void
dnodeSendMsgToMnode
(
SRpcMsg
*
rpcMsg
)
{
SRpcEpSet
epSet
=
{
0
};
dnodeGetEpSetForPeer
(
&
epSet
);
dnodeGet
Mnode
EpSetForPeer
(
&
epSet
);
dnodeSendMsgToDnode
(
&
epSet
,
rpcMsg
);
}
static
void
dnodeSendMsgToMnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
)
{
SRpcEpSet
epSet
=
{
0
};
dnodeGetEpSetForPeer
(
&
epSet
);
dnodeGet
Mnode
EpSetForPeer
(
&
epSet
);
rpcSendRecv
(
tsTrans
.
clientRpc
,
&
epSet
,
rpcMsg
,
rpcRsp
);
}
...
...
@@ -303,7 +296,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pMsg
;
rpcMsg
.
contLen
=
sizeof
(
SAuthMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_
DM_
AUTH
;
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_AUTH
;
dDebug
(
"user:%s, send auth msg to mnodes"
,
user
);
SRpcMsg
rpcRsp
=
{
0
};
...
...
source/dnode/mgmt/src/dnodeVnodes.c
浏览文件 @
d0a4c4ad
...
...
@@ -19,4 +19,8 @@
int32_t
dnodeInitVnodes
()
{
return
vnodeInit
();
}
void
dnodeCleanupVnodes
()
{
vnodeCleanup
();
}
\ No newline at end of file
void
dnodeCleanupVnodes
()
{
vnodeCleanup
();
}
void
dnodeProcessVnodesMsg
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
vnodeProcessMsg
(
NULL
,
NULL
);
}
void
dnodeGetVnodes
(
SVnodeLoads
*
pVloads
)
{}
\ No newline at end of file
source/dnode/mnode/inc/mnodeDef.h
浏览文件 @
d0a4c4ad
...
...
@@ -120,7 +120,7 @@ typedef struct SDnodeObj {
int64_t
createdTime
;
int64_t
updateTime
;
int64_t
lastAccess
;
int64_t
lastReboot
;
// time stamp for last reboot
int64_t
rebootTime
;
// time stamp for last reboot
char
fqdn
[
TSDB_FQDN_LEN
];
char
ep
[
TSDB_EP_LEN
];
uint16_t
port
;
...
...
source/dnode/mnode/src/mnodeWorker.c
浏览文件 @
d0a4c4ad
...
...
@@ -171,12 +171,12 @@ static void mnodeInitMsgFp() {
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_
DM_
AUTH] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_
DM_
AUTH] = mnodeProcessAuthMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_
DM_
GRANT] = mnodeDispatchToPeerQueue;
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_
DM_
GRANT] = grantProcessMsgInMgmt;
// tsMworker.msgFp[TSDB_MSG_TYPE_
DM_
STATUS] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_
DM_
STATUS] = mnodeProcessDnodeStatusMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_AUTH] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_AUTH] = mnodeProcessAuthMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_GRANT] = mnodeDispatchToPeerQueue;
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_GRANT] = grantProcessMsgInMgmt;
// tsMworker.msgFp[TSDB_MSG_TYPE_STATUS] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_STATUS] = mnodeProcessDnodeStatusMsg;
// // peer rsp
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue;
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
d0a4c4ad
...
...
@@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_SHOW_RETRIEVE
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_STABLE_VGROUP
||
type
==
TSDB_MSG_TYPE_TABLES_META
||
type
==
TSDB_MSG_TYPE_TABLE_META
||
type
==
TSDB_MSG_TYPE_SHOW
||
type
==
TSDB_MSG_TYPE_
DM_
STATUS
||
type
==
TSDB_MSG_TYPE_ALTER_TABLE
)
||
type
==
TSDB_MSG_TYPE_SHOW
||
type
==
TSDB_MSG_TYPE_STATUS
||
type
==
TSDB_MSG_TYPE_ALTER_TABLE
)
pContext
->
connType
=
RPC_CONN_TCPC
;
pContext
->
rid
=
taosAddRef
(
tsRpcRefId
,
pContext
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录