Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
7cfed507
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7cfed507
编写于
7月 01, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 01, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2510 from taosdata/hotfix/rpcSendRequest
change the rpcMsg definition
上级
5d7a9103
4711bb36
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
39 addition
and
35 deletion
+39
-35
src/client/src/tscServer.c
src/client/src/tscServer.c
+4
-3
src/inc/trpc.h
src/inc/trpc.h
+4
-4
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+9
-9
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+9
-9
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+9
-6
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+3
-3
未找到文件。
src/client/src/tscServer.c
浏览文件 @
7cfed507
...
...
@@ -191,7 +191,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
msgType
=
pSql
->
cmd
.
msgType
,
.
pCont
=
pMsg
,
.
contLen
=
pSql
->
cmd
.
payloadLen
,
.
handle
=
pSql
,
.
ahandle
=
pSql
,
.
handle
=
&
pSql
->
pRpcCtx
,
.
code
=
0
};
...
...
@@ -199,12 +200,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
/*pSql->pRpcCtx = */
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
ipList
,
&
rpcMsg
);
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
ipList
,
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcIpSet
*
pIpSet
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
rpcMsg
->
handle
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
rpcMsg
->
a
handle
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"%p sql is already released"
,
pSql
);
return
;
...
...
src/inc/trpc.h
浏览文件 @
7cfed507
...
...
@@ -47,8 +47,8 @@ typedef struct SRpcMsg {
void
*
pCont
;
int
contLen
;
int32_t
code
;
void
*
handle
;
void
*
ahandle
;
//
app handle set by client, for debug purpose
void
*
handle
;
// rpc handle returned to app
void
*
ahandle
;
//
app handle set by client
}
SRpcMsg
;
typedef
struct
SRpcInit
{
...
...
@@ -78,11 +78,11 @@ void rpcClose(void *);
void
*
rpcMallocCont
(
int
contLen
);
void
rpcFreeCont
(
void
*
pCont
);
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
);
void
*
rpcSendRequest
(
void
*
thandle
,
const
SRpcIpSet
*
pIpSet
,
const
SRpcMsg
*
pMsg
);
void
rpcSendRequest
(
void
*
thandle
,
const
SRpcIpSet
*
pIpSet
,
SRpcMsg
*
pMsg
);
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
);
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SRpcIpSet
*
pIpSet
);
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
);
void
rpcSendRecv
(
void
*
shandle
,
SRpcIpSet
*
pIpSet
,
const
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
void
rpcSendRecv
(
void
*
shandle
,
SRpcIpSet
*
pIpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
);
void
rpcCancelRequest
(
void
*
pContext
);
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
7cfed507
...
...
@@ -264,7 +264,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
strcpy
(
pMdCfgDnode
->
config
,
pCmCfgDnode
->
config
);
SRpcMsg
rpcMdCfgDnodeMsg
=
{
.
handle
=
0
,
.
a
handle
=
0
,
.
code
=
0
,
.
msgType
=
TSDB_MSG_TYPE_MD_CONFIG_DNODE
,
.
pCont
=
pMdCfgDnode
,
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
7cfed507
...
...
@@ -1574,7 +1574,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SRpcIpSet
ipSet
=
mnodeGetIpSetFromVgroup
(
pMsg
->
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
,
.
ahandle
=
pMsg
,
.
pCont
=
pMDCreate
,
.
contLen
=
htonl
(
pMDCreate
->
contLen
),
.
code
=
0
,
...
...
@@ -1751,7 +1751,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
mInfo
(
"app:%p:%p, table:%s, send drop ctable msg"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
);
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
,
.
ahandle
=
pMsg
,
.
pCont
=
pDrop
,
.
contLen
=
sizeof
(
SMDDropTableMsg
),
.
code
=
0
,
...
...
@@ -1799,7 +1799,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SRpcIpSet
ipSet
=
mnodeGetIpSetFromVgroup
(
pMsg
->
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
,
.
ahandle
=
pMsg
,
.
pCont
=
pMDCreate
,
.
contLen
=
htonl
(
pMDCreate
->
contLen
),
.
code
=
0
,
...
...
@@ -2144,9 +2144,9 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
// handle drop child response
static
void
mnodeProcessDropChildTableRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
if
(
rpcMsg
->
a
handle
==
NULL
)
return
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
a
handle
;
mnodeMsg
->
received
++
;
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
mnodeMsg
->
pTable
;
...
...
@@ -2195,9 +2195,9 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
* if failed, drop the table cached
*/
static
void
mnodeProcessCreateChildTableRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
if
(
rpcMsg
->
a
handle
==
NULL
)
return
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
a
handle
;
mnodeMsg
->
received
++
;
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
mnodeMsg
->
pTable
;
...
...
@@ -2238,9 +2238,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
}
static
void
mnodeProcessAlterTableRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
if
(
rpcMsg
->
a
handle
==
NULL
)
return
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
a
handle
;
mnodeMsg
->
received
++
;
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
mnodeMsg
->
pTable
;
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
7cfed507
...
...
@@ -652,7 +652,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) {
void
mnodeSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
SMDCreateVnodeMsg
*
pCreate
=
mnodeBuildCreateVnodeMsg
(
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
.
ahandle
=
ahandle
,
.
pCont
=
pCreate
,
.
contLen
=
pCreate
?
sizeof
(
SMDCreateVnodeMsg
)
:
0
,
.
code
=
0
,
...
...
@@ -673,9 +673,9 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
}
static
void
mnodeProcessCreateVnodeRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
if
(
rpcMsg
->
a
handle
==
NULL
)
return
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
a
handle
;
mnodeMsg
->
received
++
;
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
mnodeMsg
->
successed
++
;
...
...
@@ -686,7 +686,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SVgObj
*
pVgroup
=
mnodeMsg
->
pVgroup
;
mDebug
(
"vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
pVgroup
->
vgId
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
received
,
mnodeMsg
->
successed
,
mnodeMsg
->
expected
,
mnodeMsg
->
rpcMsg
.
handle
,
rpcMsg
->
handle
);
mnodeMsg
->
rpcMsg
.
handle
,
rpcMsg
->
a
handle
);
if
(
mnodeMsg
->
received
!=
mnodeMsg
->
expected
)
return
;
...
...
@@ -718,7 +718,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) {
void
mnodeSendDropVnodeMsg
(
int32_t
vgId
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
SMDDropVnodeMsg
*
pDrop
=
mnodeBuildDropVnodeMsg
(
vgId
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
.
ahandle
=
ahandle
,
.
pCont
=
pDrop
,
.
contLen
=
pDrop
?
sizeof
(
SMDDropVnodeMsg
)
:
0
,
.
code
=
0
,
...
...
@@ -737,10 +737,10 @@ static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
}
static
void
mnodeProcessDropVnodeRsp
(
SRpcMsg
*
rpcMsg
)
{
mDebug
(
"drop vnode rsp is received, handle:%p"
,
rpcMsg
->
handle
);
if
(
rpcMsg
->
handle
==
NULL
)
return
;
mDebug
(
"drop vnode rsp is received, handle:%p"
,
rpcMsg
->
a
handle
);
if
(
rpcMsg
->
a
handle
==
NULL
)
return
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
a
handle
;
mnodeMsg
->
received
++
;
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
mnodeMsg
->
code
=
rpcMsg
->
code
;
...
...
@@ -750,7 +750,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
SVgObj
*
pVgroup
=
mnodeMsg
->
pVgroup
;
mDebug
(
"vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
pVgroup
->
vgId
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
received
,
mnodeMsg
->
successed
,
mnodeMsg
->
expected
,
mnodeMsg
->
rpcMsg
.
handle
,
rpcMsg
->
handle
);
mnodeMsg
->
rpcMsg
.
handle
,
rpcMsg
->
a
handle
);
if
(
mnodeMsg
->
received
!=
mnodeMsg
->
expected
)
return
;
...
...
src/rpc/src/rpcMain.c
浏览文件 @
7cfed507
...
...
@@ -354,13 +354,13 @@ void *rpcReallocCont(void *ptr, int contLen) {
return
start
+
sizeof
(
SRpcReqContext
)
+
sizeof
(
SRpcHead
);
}
void
*
rpcSendRequest
(
void
*
shandle
,
const
SRpcIpSet
*
pIpSet
,
const
SRpcMsg
*
pMsg
)
{
void
rpcSendRequest
(
void
*
shandle
,
const
SRpcIpSet
*
pIpSet
,
SRpcMsg
*
pMsg
)
{
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
shandle
;
SRpcReqContext
*
pContext
;
int
contLen
=
rpcCompressRpcMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
);
pContext
=
(
SRpcReqContext
*
)
(
pMsg
->
pCont
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
));
pContext
->
ahandle
=
pMsg
->
handle
;
pContext
->
ahandle
=
pMsg
->
a
handle
;
pContext
->
pRpc
=
(
SRpcInfo
*
)
shandle
;
pContext
->
ipSet
=
*
pIpSet
;
pContext
->
contLen
=
contLen
;
...
...
@@ -380,9 +380,12 @@ void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg
||
type
==
TSDB_MSG_TYPE_CM_SHOW
)
pContext
->
connType
=
RPC_CONN_TCPC
;
// set the handle to pContext, so app can cancel the request
if
(
pMsg
->
handle
)
*
((
void
**
)
pMsg
->
handle
)
=
pContext
;
rpcSendReqToServer
(
pRpc
,
pContext
);
return
pContext
;
return
;
}
void
rpcSendResponse
(
const
SRpcMsg
*
pRsp
)
{
...
...
@@ -483,7 +486,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return
0
;
}
void
rpcSendRecv
(
void
*
shandle
,
SRpcIpSet
*
pIpSet
,
const
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
void
rpcSendRecv
(
void
*
shandle
,
SRpcIpSet
*
pIpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
SRpcReqContext
*
pContext
;
pContext
=
(
SRpcReqContext
*
)
(
pMsg
->
pCont
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
));
...
...
@@ -1051,7 +1054,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
}
else
{
// it's a response
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
rpcMsg
.
handle
=
pContext
->
ahandle
;
rpcMsg
.
handle
=
pContext
;
pConn
->
pContext
=
NULL
;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
...
...
@@ -1255,7 +1258,7 @@ static void rpcProcessConnError(void *param, void *id) {
if
(
pContext
->
numOfTry
>=
pContext
->
ipSet
.
numOfIps
)
{
rpcMsg
.
msgType
=
pContext
->
msgType
+
1
;
rpcMsg
.
handle
=
pContext
->
ahandle
;
rpcMsg
.
a
handle
=
pContext
->
ahandle
;
rpcMsg
.
code
=
pContext
->
code
;
rpcMsg
.
pCont
=
NULL
;
rpcMsg
.
contLen
=
0
;
...
...
src/rpc/test/rclient.c
浏览文件 @
7cfed507
...
...
@@ -33,7 +33,7 @@ typedef struct {
}
SInfo
;
static
void
processResponse
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
handle
;
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
a
handle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
if
(
pIpSet
)
pInfo
->
ipSet
=
*
pIpSet
;
...
...
@@ -46,7 +46,7 @@ static int tcount = 0;
static
void
*
sendRequest
(
void
*
param
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
param
;
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
=
{
0
}
;
tDebug
(
"thread:%d, start to send request"
,
pInfo
->
index
);
...
...
@@ -54,7 +54,7 @@ static void *sendRequest(void *param) {
pInfo
->
num
++
;
rpcMsg
.
pCont
=
rpcMallocCont
(
pInfo
->
msgSize
);
rpcMsg
.
contLen
=
pInfo
->
msgSize
;
rpcMsg
.
handle
=
pInfo
;
rpcMsg
.
a
handle
=
pInfo
;
rpcMsg
.
msgType
=
1
;
tDebug
(
"thread:%d, send request, contLen:%d num:%d"
,
pInfo
->
index
,
pInfo
->
msgSize
,
pInfo
->
num
);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
ipSet
,
&
rpcMsg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录