Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
3f88174a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3f88174a
编写于
5月 11, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/tsim
上级
270e7fc9
8c0ba3b9
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
42 addition
and
44 deletion
+42
-44
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+7
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+2
-2
src/dnode/src/dnodePeer.c
src/dnode/src/dnodePeer.c
+6
-6
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+2
-2
src/inc/trpc.h
src/inc/trpc.h
+1
-4
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+6
-7
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+3
-10
src/rpc/test/rsclient.c
src/rpc/test/rsclient.c
+0
-8
src/rpc/test/rserver.c
src/rpc/test/rserver.c
+1
-1
tests/script/unique/mnode/mgmt22.sim
tests/script/unique/mnode/mgmt22.sim
+13
-2
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
3f88174a
...
...
@@ -365,7 +365,7 @@ void tscInitMsgsFp();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
multiVnodeInsertion
);
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
);
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcIpSet
*
pIpSet
);
int
tscProcessSql
(
SSqlObj
*
pSql
);
int
tscRenewMeterMeta
(
SSqlObj
*
pSql
,
char
*
tableId
);
...
...
src/client/src/tscServer.c
浏览文件 @
3f88174a
...
...
@@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
}
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
)
{
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcIpSet
*
pIpSet
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
rpcMsg
->
handle
;
if
(
pSql
==
NULL
)
{
tscError
(
"%p sql is already released"
,
pSql
->
signature
);
...
...
@@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
return
;
}
if
(
pCmd
->
command
<
TSDB_SQL_MGMT
)
{
if
(
pIpSet
)
pSql
->
ipList
=
*
pIpSet
;
}
else
{
if
(
pIpSet
)
tscMgmtIpSet
=
*
pIpSet
;
}
if
(
rpcMsg
->
pCont
==
NULL
)
{
rpcMsg
->
code
=
TSDB_CODE_NETWORK_UNAVAIL
;
}
else
{
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
3f88174a
...
...
@@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return
taosCfgDynamicOptions
(
pCfg
->
config
);
}
void
dnodeUpdateIpSet
(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
)
{
dPrint
(
"mnode IP list is changed
for ufp is called
, numOfIps:%d inUse:%d"
,
pIpSet
->
numOfIps
,
pIpSet
->
inUse
);
void
dnodeUpdateIpSet
(
SRpcIpSet
*
pIpSet
)
{
dPrint
(
"mnode IP list is changed, numOfIps:%d inUse:%d"
,
pIpSet
->
numOfIps
,
pIpSet
->
inUse
);
for
(
int
i
=
0
;
i
<
pIpSet
->
numOfIps
;
++
i
)
{
dPrint
(
"mnode index:%d %s:%u"
,
i
,
pIpSet
->
fqdn
[
i
],
pIpSet
->
port
[
i
])
}
...
...
src/dnode/src/dnodePeer.c
浏览文件 @
3f88174a
...
...
@@ -29,11 +29,11 @@
#include "dnodeVWrite.h"
#include "mnode.h"
extern
void
dnodeUpdateIpSet
(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
);
extern
void
dnodeUpdateIpSet
(
SRpcIpSet
*
pIpSet
);
static
void
(
*
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
dnodeProcessReqMsgFromDnode
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessReqMsgFromDnode
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
);
static
void
(
*
dnodeProcessRspMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
rpcMsg
);
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
);
static
void
*
tsDnodeServerRpc
=
NULL
;
static
void
*
tsDnodeClientRpc
=
NULL
;
...
...
@@ -81,7 +81,7 @@ void dnodeCleanupServer() {
}
}
static
void
dnodeProcessReqMsgFromDnode
(
SRpcMsg
*
pMsg
)
{
static
void
dnodeProcessReqMsgFromDnode
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
SRpcMsg
rspMsg
;
rspMsg
.
handle
=
pMsg
->
handle
;
rspMsg
.
pCont
=
NULL
;
...
...
@@ -119,7 +119,6 @@ int32_t dnodeInitClient() {
rpcInit
.
label
=
"DND-C"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dnodeProcessRspFromDnode
;
rpcInit
.
ufp
=
dnodeUpdateIpSet
;
rpcInit
.
sessions
=
100
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
...
...
@@ -145,9 +144,10 @@ void dnodeCleanupClient() {
}
}
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
)
{
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
if
(
dnodeProcessRspMsgFp
[
pMsg
->
msgType
])
{
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pIpSet
)
dnodeUpdateIpSet
(
pIpSet
);
(
*
dnodeProcessRspMsgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
dError
(
"RPC %p, msg:%s is not processed"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]);
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
3f88174a
...
...
@@ -28,7 +28,7 @@
#include "dnodeShell.h"
static
void
(
*
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
);
static
int
dnodeRetrieveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
*
tsDnodeShellRpc
=
NULL
;
static
int32_t
tsDnodeQueryReqNum
=
0
;
...
...
@@ -106,7 +106,7 @@ void dnodeCleanupShell() {
}
}
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
)
{
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
SRpcMsg
rpcMsg
;
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
pCont
=
NULL
;
...
...
src/inc/trpc.h
浏览文件 @
3f88174a
...
...
@@ -66,10 +66,7 @@ typedef struct {
char
*
ckey
;
// ciphering key
// call back to process incoming msg, code shall be ignored by server app
void
(
*
cfp
)(
SRpcMsg
*
);
// call back to process notify the ipSet changes, for client app only
void
(
*
ufp
)(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
);
void
(
*
cfp
)(
SRpcMsg
*
,
SRpcIpSet
*
);
// call back to retrieve the client auth info, for server app only
int
(
*
afp
)(
char
*
tableId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
...
...
src/rpc/src/rpcMain.c
浏览文件 @
3f88174a
...
...
@@ -55,9 +55,8 @@ typedef struct {
char
secret
[
TSDB_KEY_LEN
];
// secret for the link
char
ckey
[
TSDB_KEY_LEN
];
// ciphering key
void
(
*
cfp
)(
SRpcMsg
*
);
void
(
*
cfp
)(
SRpcMsg
*
,
SRpcIpSet
*
);
int
(
*
afp
)(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
void
(
*
ufp
)(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
);
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
...
...
@@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) {
if
(
pInit
->
secret
)
strcpy
(
pRpc
->
secret
,
pInit
->
secret
);
if
(
pInit
->
ckey
)
strcpy
(
pRpc
->
ckey
,
pInit
->
ckey
);
pRpc
->
spi
=
pInit
->
spi
;
pRpc
->
ufp
=
pInit
->
ufp
;
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
afp
=
pInit
->
afp
;
...
...
@@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
memcpy
(
pContext
->
pRsp
,
pMsg
,
sizeof
(
SRpcMsg
));
}
else
{
// for asynchronous API
if
(
pRpc
->
ufp
&&
(
pContext
->
ipSet
.
inUse
!=
pContext
->
oldInUse
||
pContext
->
redirect
))
(
*
pRpc
->
ufp
)(
pContext
->
ahandle
,
&
pContext
->
ipSet
);
// notify the update of ipSet
SRpcIpSet
*
pIpSet
=
NULL
;
if
(
pContext
->
ipSet
.
inUse
!=
pContext
->
oldInUse
||
pContext
->
redirect
)
pIpSet
=
&
pContext
->
ipSet
;
(
*
pRpc
->
cfp
)(
pMsg
);
(
*
pRpc
->
cfp
)(
pMsg
,
pIpSet
);
}
// free the request message
...
...
@@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
rpcMsg
.
handle
=
pConn
;
taosTmrReset
(
rpcProcessProgressTimer
,
tsRpcTimer
/
2
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
(
*
(
pRpc
->
cfp
))(
&
rpcMsg
);
(
*
(
pRpc
->
cfp
))(
&
rpcMsg
,
NULL
);
}
else
{
// it's a response
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
...
...
src/rpc/test/rclient.c
浏览文件 @
3f88174a
...
...
@@ -31,22 +31,16 @@ typedef struct {
void
*
pRpc
;
}
SInfo
;
static
void
processResponse
(
SRpcMsg
*
pMsg
)
{
static
void
processResponse
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
handle
;
tTrace
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
rpcFreeCont
(
pMsg
->
pCont
)
;
if
(
pIpSet
)
pInfo
->
ipSet
=
*
pIpSet
;
rpcFreeCont
(
pMsg
->
pCont
);
sem_post
(
&
pInfo
->
rspSem
);
}
static
void
processUpdateIpSet
(
void
*
handle
,
SRpcIpSet
*
pIpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
handle
;
tTrace
(
"thread:%d, ip set is changed, index:%d"
,
pInfo
->
index
,
pIpSet
->
inUse
);
pInfo
->
ipSet
=
*
pIpSet
;
}
static
int
tcount
=
0
;
static
void
*
sendRequest
(
void
*
param
)
{
...
...
@@ -99,7 +93,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
label
=
"APP"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processResponse
;
rpcInit
.
ufp
=
processUpdateIpSet
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
...
...
src/rpc/test/rsclient.c
浏览文件 @
3f88174a
...
...
@@ -32,12 +32,6 @@ typedef struct {
void
*
pRpc
;
}
SInfo
;
static
void
processUpdateIpSet
(
void
*
handle
,
SRpcIpSet
*
pIpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
handle
;
tTrace
(
"thread:%d, ip set is changed, index:%d"
,
pInfo
->
index
,
pIpSet
->
inUse
);
pInfo
->
ipSet
=
*
pIpSet
;
}
static
int
tcount
=
0
;
static
int
terror
=
0
;
...
...
@@ -100,8 +94,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"APP"
;
rpcInit
.
numOfThreads
=
1
;
// rpcInit.cfp = processResponse;
rpcInit
.
ufp
=
processUpdateIpSet
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
...
...
src/rpc/test/rserver.c
浏览文件 @
3f88174a
...
...
@@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
return
ret
;
}
void
processRequestMsg
(
SRpcMsg
*
pMsg
)
{
void
processRequestMsg
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
...
...
tests/script/unique/mnode/mgmt22.sim
浏览文件 @
3f88174a
...
...
@@ -8,7 +8,7 @@ system sh/cfg.sh -n dnode2 -c numOfMPeers -v 2
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 2
print ============== step1
system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode1 -s start
-t
sleep 3000
sql connect
...
...
@@ -20,7 +20,7 @@ if $data2_1 != master then
endi
print ============== step2
system sh/exec_up.sh -n dnode2 -s start
system sh/exec_up.sh -n dnode2 -s start
-t
sql create dnode $hostname2
$x = 0
...
...
@@ -41,6 +41,17 @@ if $data2_2 != slave then
goto show2
endi
system sh/exec_up.sh -n dnode1 -s stop -x SIGINT
system sh/exec_up.sh -n dnode2 -s stop -x SIGINT
system sh/exec_up.sh -n dnode3 -s stop -x SIGINT
system sh/exec_up.sh -n dnode4 -s stop -x SIGINT
system sh/exec_up.sh -n dnode5 -s stop -x SIGINT
system sh/exec_up.sh -n dnode6 -s stop -x SIGINT
system sh/exec_up.sh -n dnode7 -s stop -x SIGINT
system sh/exec_up.sh -n dnode8 -s stop -x SIGINT
return
print ============== step3
sql_error drop dnode $hostname1 -x error1
print should not drop master
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录