Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
32e7e0e7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
32e7e0e7
编写于
2月 19, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
cmake file for rpc unittest
上级
f3f657a6
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
197 addition
and
106 deletion
+197
-106
src/CMakeLists.txt
src/CMakeLists.txt
+8
-8
src/client/CMakeLists.txt
src/client/CMakeLists.txt
+3
-1
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-7
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+1
-0
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+58
-46
src/client/src/tscSql.c
src/client/src/tscSql.c
+8
-8
src/client/src/tscStream.c
src/client/src/tscStream.c
+1
-1
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+15
-34
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/inc/trpc.h
src/inc/trpc.h
+1
-0
src/rpc/CMakeLists.txt
src/rpc/CMakeLists.txt
+3
-0
src/rpc/inc/thaship.h
src/rpc/inc/thaship.h
+8
-0
src/rpc/inc/ttcpclient.h
src/rpc/inc/ttcpclient.h
+8
-0
src/rpc/inc/ttcpserver.h
src/rpc/inc/ttcpserver.h
+8
-0
src/rpc/inc/tudp.h
src/rpc/inc/tudp.h
+8
-0
src/rpc/src/trpc.c
src/rpc/src/trpc.c
+1
-0
src/rpc/test/CMakeLists.txt
src/rpc/test/CMakeLists.txt
+15
-0
src/rpc/test/unittest.c
src/rpc/test/unittest.c
+46
-0
未找到文件。
src/CMakeLists.txt
浏览文件 @
32e7e0e7
...
...
@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY
(
os
)
ADD_SUBDIRECTORY
(
util
)
ADD_SUBDIRECTORY
(
rpc
)
ADD_SUBDIRECTORY
(
client
)
ADD_SUBDIRECTORY
(
kit
)
ADD_SUBDIRECTORY
(
plugins
)
ADD_SUBDIRECTORY
(
sdb
)
ADD_SUBDIRECTORY
(
mnode
)
ADD_SUBDIRECTORY
(
dnode
)
ADD_SUBDIRECTORY
(
vnode
)
ADD_SUBDIRECTORY
(
connector/jdbc
)
#
ADD_SUBDIRECTORY(client)
#
ADD_SUBDIRECTORY(kit)
#
ADD_SUBDIRECTORY(plugins)
#
ADD_SUBDIRECTORY(sdb)
#
ADD_SUBDIRECTORY(mnode)
#
ADD_SUBDIRECTORY(dnode)
#
ADD_SUBDIRECTORY(vnode)
#
ADD_SUBDIRECTORY(connector/jdbc)
src/client/CMakeLists.txt
浏览文件 @
32e7e0e7
...
...
@@ -4,8 +4,10 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES
(
inc
)
INCLUDE_DIRECTORIES
(
jni
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/util/inc
)
INCLUDE_DIRECTORIES
(
${
TD_ENTERPRISE_DIR
}
/src/inc
)
INCLUDE_DIRECTORIES
(
${
TD_OS_DIR
}
/inc
)
AUX_SOURCE_DIRECTORY
(
./
src SRC
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
IF
((
TD_LINUX_64
)
OR
(
TD_LINUX_32 AND TD_ARM
))
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/linux
)
...
...
src/client/inc/tsclient.h
浏览文件 @
32e7e0e7
...
...
@@ -30,6 +30,7 @@ extern "C" {
#include "taosdef.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
...
...
@@ -324,6 +325,7 @@ typedef struct _sql_obj {
int64_t
stime
;
uint32_t
queryId
;
void
*
thandle
;
SRpcIpSet
ipSet
;
void
*
pStream
;
void
*
pSubscription
;
char
*
sqlstr
;
...
...
@@ -371,12 +373,6 @@ typedef struct _sstream {
struct
_sstream
*
prev
,
*
next
;
}
SSqlStream
;
typedef
struct
{
char
numOfIps
;
uint32_t
ip
[
TSDB_MAX_MGMT_IPS
];
char
ipstr
[
TSDB_MAX_MGMT_IPS
][
TSDB_IPv4ADDR_LEN
];
}
SIpStrList
;
// tscSql API
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
multiVnodeInsertion
);
...
...
@@ -461,7 +457,7 @@ extern void * tscQhandle;
extern
int
tscKeepConn
[];
extern
int
tsInsertHeadSize
;
extern
int
tscNumOfThreads
;
extern
S
IpStrList
tscMgmtIpList
;
extern
S
RpcIpSet
tscMgmtIpList
;
typedef
void
(
*
__async_cb_func_t
)(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
...
...
src/client/src/tscAsync.c
浏览文件 @
32e7e0e7
...
...
@@ -25,6 +25,7 @@
#include "tscSQLParser.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
static
void
tscProcessFetchRow
(
SSchedMsg
*
pMsg
);
static
void
tscAsyncQueryRowsForNextVnode
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
...
...
src/client/src/tscProfile.c
浏览文件 @
32e7e0e7
...
...
@@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) {
SSqlObj
*
pSql
=
pObj
->
sqlList
;
while
(
pSql
)
{
taosStopRpcConn
(
pSql
->
thandle
);
//
taosStopRpcConn(pSql->thandle);
pSql
=
pSql
->
next
;
}
...
...
src/client/src/tscServer.c
浏览文件 @
32e7e0e7
...
...
@@ -31,10 +31,14 @@
#define TSC_MGMT_VNODE 999
S
IpStrList
tscMgmtIpList
;
S
RpcIpSet
tscMgmtIpList
;
int
tsMasterIndex
=
0
;
int
tsSlaveIndex
=
1
;
//temp
SRpcIpSet
tscMgmtIpSet
;
SRpcIpSet
tscDnodeIpSet
;
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
=
{
0
};
int
(
*
tscProcessMsgRsp
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
);
...
...
@@ -53,7 +57,7 @@ void tscPrintMgmtIp() {
tscError
(
"invalid mgmt IP list:%d"
,
tscMgmtIpList
.
numOfIps
);
}
else
{
for
(
int
i
=
0
;
i
<
tscMgmtIpList
.
numOfIps
;
++
i
)
{
tscTrace
(
"mgmt index:%d ip:%s"
,
i
,
tscMgmtIpList
.
ip
s
tr
[
i
]);
tscTrace
(
"mgmt index:%d ip:%s"
,
i
,
tscMgmtIpList
.
ip
S
tr
[
i
]);
}
}
}
...
...
@@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
tscMgmtIpList
.
numOfIps
=
pIpList
->
numOfIps
;
if
(
memcmp
(
tscMgmtIpList
.
ip
,
pIpList
->
ip
,
pIpList
->
numOfIps
*
4
)
!=
0
)
{
for
(
int
i
=
0
;
i
<
pIpList
->
numOfIps
;
++
i
)
{
tinet_ntoa
(
tscMgmtIpList
.
ip
s
tr
[
i
],
pIpList
->
ip
[
i
]);
tinet_ntoa
(
tscMgmtIpList
.
ip
S
tr
[
i
],
pIpList
->
ip
[
i
]);
tscMgmtIpList
.
ip
[
i
]
=
pIpList
->
ip
[
i
];
}
tscTrace
(
"cluster mgmt IP list:"
);
...
...
@@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
void
tscSetMgmtIpListFromEdge
()
{
if
(
tscMgmtIpList
.
numOfIps
!=
2
)
{
tscMgmtIpList
.
numOfIps
=
2
;
strcpy
(
tscMgmtIpList
.
ip
s
tr
[
0
],
tsMasterIp
);
strcpy
(
tscMgmtIpList
.
ip
S
tr
[
0
],
tsMasterIp
);
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
strcpy
(
tscMgmtIpList
.
ip
s
tr
[
1
],
tsMasterIp
);
strcpy
(
tscMgmtIpList
.
ip
S
tr
[
1
],
tsMasterIp
);
tscMgmtIpList
.
ip
[
1
]
=
inet_addr
(
tsMasterIp
);
tscTrace
(
"edge mgmt IP list:"
);
tscPrintMgmtIp
();
...
...
@@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if
(
tscShouldFreeHeatBeat
(
pObj
->
pHb
))
{
tscTrace
(
"%p free HB object and release connection, pConn:%p"
,
pObj
,
pObj
->
pHb
->
thandle
);
taosCloseRpcConn
(
pObj
->
pHb
->
thandle
);
//
taosCloseRpcConn(pObj->pHb->thandle);
tscFreeSqlObj
(
pObj
->
pHb
);
tscCloseTscObj
(
pObj
);
...
...
@@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql
(
pObj
->
pHb
);
}
void
tscGetConnToMgmt
(
SSqlObj
*
pSql
,
uint8_t
*
pCode
)
{
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pSql
->
retry
<
tscGetMgmtConnMaxRetryTimes
())
{
...
...
@@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
if
(
pSql
->
cmd
.
command
>
TSDB_SQL_READ
&&
pSql
->
index
==
0
)
pSql
->
index
=
1
;
void
*
thandle
=
taosGetConnFromCache
(
tscConnCache
,
tscMgmtIpList
.
ip
[
pSql
->
index
],
TSC_MGMT_VNODE
,
pTscObj
->
user
);
if
(
thandle
==
NULL
)
{
SRpcConnInit
connInit
;
memset
(
&
connInit
,
0
,
sizeof
(
connInit
));
connInit
.
cid
=
0
;
connInit
.
sid
=
0
;
connInit
.
meterId
=
pSql
->
pTscObj
->
user
;
connInit
.
peerId
=
0
;
connInit
.
shandle
=
pTscMgmtConn
;
connInit
.
ahandle
=
pSql
;
connInit
.
peerPort
=
tsMgmtShellPort
;
connInit
.
spi
=
1
;
connInit
.
encrypt
=
0
;
connInit
.
secret
=
pSql
->
pTscObj
->
pass
;
connInit
.
peerIp
=
tscMgmtIpList
.
ipstr
[
pSql
->
index
];
thandle
=
taosOpenRpcConn
(
&
connInit
,
pCode
);
}
// if (thandle == NULL) {
// SRpcConnInit connInit;
// memset(&connInit, 0, sizeof(connInit));
// connInit.cid = 0;
// connInit.sid = 0;
// connInit.meterId = pSql->pTscObj->user;
// connInit.peerId = 0;
// connInit.shandle = pTscMgmtConn;
// connInit.ahandle = pSql;
// connInit.peerPort = tsMgmtShellPort;
// connInit.spi = 1;
// connInit.encrypt = 0;
// connInit.secret = pSql->pTscObj->pass;
//
// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
// thandle = taosOpenRpcConn(&connInit, pCode);
// }
pSql
->
thandle
=
thandle
;
pSql
->
ip
=
tscMgmtIpList
.
ip
[
pSql
->
index
];
...
...
@@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
void
*
thandle
=
taosGetConnFromCache
(
tscConnCache
,
pVPeersDesc
[
pSql
->
index
].
ip
,
pVPeersDesc
[
pSql
->
index
].
vnode
,
pTscObj
->
user
);
if
(
thandle
==
NULL
)
{
SRpcConnInit
connInit
;
tinet_ntoa
(
ipstr
,
pVPeersDesc
[
pSql
->
index
].
ip
);
memset
(
&
connInit
,
0
,
sizeof
(
connInit
));
connInit
.
cid
=
vidIndex
;
connInit
.
sid
=
0
;
connInit
.
spi
=
0
;
connInit
.
encrypt
=
0
;
connInit
.
meterId
=
pSql
->
pTscObj
->
user
;
connInit
.
peerId
=
htonl
((
pVPeersDesc
[
pSql
->
index
].
vnode
<<
TSDB_SHELL_VNODE_BITS
));
connInit
.
shandle
=
pVnodeConn
;
connInit
.
ahandle
=
pSql
;
connInit
.
peerIp
=
ipstr
;
connInit
.
peerPort
=
tsVnodeShellPort
;
thandle
=
taosOpenRpcConn
(
&
connInit
,
pCode
);
vidIndex
=
(
vidIndex
+
1
)
%
tscNumOfThreads
;
}
//
if (thandle == NULL) {
//
SRpcConnInit connInit;
//
tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
//
memset(&connInit, 0, sizeof(connInit));
//
connInit.cid = vidIndex;
//
connInit.sid = 0;
//
connInit.spi = 0;
//
connInit.encrypt = 0;
//
connInit.meterId = pSql->pTscObj->user;
//
connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
//
connInit.shandle = pVnodeConn;
//
connInit.ahandle = pSql;
//
connInit.peerIp = ipstr;
//
connInit.peerPort = tsVnodeShellPort;
//
thandle = taosOpenRpcConn(&connInit, pCode);
//
vidIndex = (vidIndex + 1) % tscNumOfThreads;
//
}
pSql
->
thandle
=
thandle
;
pSql
->
ip
=
pVPeersDesc
[
pSql
->
index
].
ip
;
...
...
@@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
tscTrace
(
"%p vnode:%d ip:%p index:%d is picked up, pConn:%p"
,
pSql
,
pVPeersDesc
[
pSql
->
index
].
vnode
,
pVPeersDesc
[
pSql
->
index
].
ip
,
pSql
->
index
,
pSql
->
thandle
);
//TODO fetch from vpeerdesc
pSql
->
ipSet
=
tscMgmtIpSet
;
break
;
}
...
...
@@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) {
size_t
totalLen
=
pSql
->
cmd
.
payloadLen
+
tsRpcHeadSize
+
sizeof
(
STaosDigest
);
// the memory will be released by taosProcessResponse, so no memory leak here
char
*
buf
=
malloc
(
total
Len
);
if
(
NULL
==
buf
)
{
char
*
pStart
=
rpcMallocCont
(
pSql
->
cmd
.
payload
Len
);
if
(
NULL
==
pStart
)
{
tscError
(
"%p msg:%s malloc fail"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
memcpy
(
buf
,
pSql
->
cmd
.
payload
,
total
Len
);
memcpy
(
pStart
,
pSql
->
cmd
.
payload
+
tsRpcHeadSize
,
pSql
->
cmd
.
payload
Len
);
tscTrace
(
"%p msg:%s is sent to server"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
char
*
pStart
=
taosBuildReqHeader
(
pSql
->
thandle
,
pSql
->
cmd
.
msgType
,
buf
);
if
(
pStart
)
{
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t
signature
=
(
uint64_t
)
pSql
->
signature
;
if
(
tscUpdateVnodeMsg
[
pSql
->
cmd
.
command
])
(
*
tscUpdateVnodeMsg
[
pSql
->
cmd
.
command
])(
pSql
,
buf
);
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
int
ret
;
if
(
pSql
->
cmd
.
command
<
TSDB_SQL_MGMT
)
ret
=
rpcSendRequest
(
pTscMgmtConn
,
pSql
->
cmd
.
msgType
,
pStart
,
pSql
->
cmd
.
payloadLen
,
pSql
);
else
ret
=
rpcSendRequest
(
pVnodeConn
,
pSql
->
cmd
.
msgType
,
pStart
,
pSql
->
cmd
.
payloadLen
,
pSql
);
int
ret
=
taosSendMsgToPeerH
(
pSql
->
thandle
,
pStart
,
pSql
->
cmd
.
payloadLen
,
pSql
);
if
(
ret
>=
0
)
{
code
=
0
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
32e7e0e7
...
...
@@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
if
(
ip
&&
ip
[
0
])
{
tscMgmtIpList
.
numOfIps
=
4
;
strcpy
(
tscMgmtIpList
.
ip
s
tr
[
0
],
ip
);
tscMgmtIpList
.
numOfIps
=
3
;
strcpy
(
tscMgmtIpList
.
ip
S
tr
[
0
],
ip
);
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
ip
);
strcpy
(
tscMgmtIpList
.
ip
str
[
1
],
i
p
);
tscMgmtIpList
.
ip
[
1
]
=
inet_addr
(
i
p
);
strcpy
(
tscMgmtIpList
.
ip
str
[
2
],
tsMaster
Ip
);
tscMgmtIpList
.
ip
[
2
]
=
inet_addr
(
ts
Master
Ip
);
strcpy
(
tscMgmtIpList
.
ipstr
[
3
],
tsSecondIp
)
;
tscMgmtIpList
.
ip
[
3
]
=
inet_addr
(
tsSecondIp
)
;
strcpy
(
tscMgmtIpList
.
ip
Str
[
1
],
tsMasterI
p
);
tscMgmtIpList
.
ip
[
1
]
=
inet_addr
(
tsMasterI
p
);
strcpy
(
tscMgmtIpList
.
ip
Str
[
2
],
tsSecond
Ip
);
tscMgmtIpList
.
ip
[
2
]
=
inet_addr
(
ts
Second
Ip
);
tscMgmtIpList
.
index
=
0
;
tscMgmtIpList
.
port
=
tsMgmtShellPort
;
}
pObj
=
(
STscObj
*
)
malloc
(
sizeof
(
STscObj
));
...
...
src/client/src/tscStream.c
浏览文件 @
32e7e0e7
...
...
@@ -19,7 +19,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tsched.h"
#include "taosmsg.h"
#include "tscUtil.h"
#include "tsclient.h"
...
...
src/client/src/tscSystem.c
浏览文件 @
32e7e0e7
...
...
@@ -24,8 +24,9 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tsched.h"
#include "tsclient.h"
// global, not configurable
void
*
pVnodeConn
;
void
*
pVMeterConn
;
...
...
@@ -94,18 +95,17 @@ void taos_init_imp() {
if
(
tsTscEnableRecordSql
!=
0
)
{
taosInitNote
(
tsNumOfLogLines
/
10
,
1
,
(
char
*
)
"tsc_note"
);
}
tscMgmtIpList
.
numOfIps
=
2
;
strcpy
(
tscMgmtIpList
.
ipstr
[
0
],
tsMasterIp
);
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
strcpy
(
tscMgmtIpList
.
ipstr
[
1
],
tsMasterIp
);
tscMgmtIpList
.
ip
[
1
]
=
inet_addr
(
tsMasterIp
);
tscMgmtIpList
.
index
=
0
;
tscMgmtIpList
.
port
=
tsMgmtShellPort
;
tscMgmtIpList
.
numOfIps
=
1
;
strcpy
(
tscMgmtIpList
.
ipStr
[
0
],
tsMasterIp
);
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
if
(
tsSecondIp
[
0
])
{
tscMgmtIpList
.
numOfIps
=
3
;
strcpy
(
tscMgmtIpList
.
ip
str
[
2
],
tsSecondIp
);
tscMgmtIpList
.
ip
[
2
]
=
inet_addr
(
tsSecondIp
);
tscMgmtIpList
.
numOfIps
=
2
;
strcpy
(
tscMgmtIpList
.
ip
Str
[
1
],
tsSecondIp
);
tscMgmtIpList
.
ip
[
1
]
=
inet_addr
(
tsSecondIp
);
}
tscInitMsgs
();
...
...
@@ -132,42 +132,23 @@ void taos_init_imp() {
rpcInit
.
label
=
"TSC-vnode"
;
rpcInit
.
numOfThreads
=
tscNumOfThreads
;
rpcInit
.
fp
=
tscProcessMsgFromServer
;
rpcInit
.
bits
=
20
;
rpcInit
.
numOfChanns
=
tscNumOfThreads
;
rpcInit
.
sessionsPerChann
=
tsMaxVnodeConnections
/
tscNumOfThreads
;
rpcInit
.
idMgmt
=
TAOS_ID_FREE
;
rpcInit
.
noFree
=
0
;
rpcInit
.
sessions
=
tsMaxVnodeConnections
;
rpcInit
.
connType
=
TAOS_CONN_SOCKET_TYPE_C
();
rpcInit
.
qhandle
=
tscQhandle
;
pVnodeConn
=
taosOpenRpc
(
&
rpcInit
);
pVnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pVnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to vnode"
);
return
;
}
for
(
int
i
=
0
;
i
<
tscNumOfThreads
;
++
i
)
{
int
retVal
=
taosOpenRpcChann
(
pVnodeConn
,
i
,
rpcInit
.
sessionsPerChann
);
if
(
0
!=
retVal
)
{
tError
(
"TSC-vnode, failed to open rpc chann"
);
taosCloseRpc
(
pVnodeConn
);
return
;
}
}
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsLocalIp
;
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC-mgmt"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
fp
=
tscProcessMsgFromServer
;
rpcInit
.
bits
=
20
;
rpcInit
.
numOfChanns
=
1
;
rpcInit
.
sessionsPerChann
=
tsMaxMgmtConnections
;
rpcInit
.
idMgmt
=
TAOS_ID_FREE
;
rpcInit
.
noFree
=
0
;
rpcInit
.
sessions
=
tsMaxMgmtConnections
;
rpcInit
.
connType
=
TAOS_CONN_SOCKET_TYPE_C
();
rpcInit
.
qhandle
=
tscQhandle
;
pTscMgmtConn
=
taosOpenRpc
(
&
rpcInit
);
pTscMgmtConn
=
rpcOpen
(
&
rpcInit
);
if
(
pTscMgmtConn
==
NULL
)
{
tscError
(
"failed to init connection to mgmt"
);
return
;
...
...
@@ -183,7 +164,7 @@ void taos_init_imp() {
if
(
tscCacheHandle
==
NULL
)
tscCacheHandle
=
taosInitDataCache
(
tsMaxMeterConnections
/
2
,
tscTmr
,
refreshTime
);
tscConnCache
=
taosOpenConnCache
(
tsMaxMeterConnections
*
2
,
taosCloseRpcConn
,
tscTmr
,
tsShellActivityTimer
*
1000
);
tscConnCache
=
taosOpenConnCache
(
tsMaxMeterConnections
*
2
,
NULL
/*taosCloseRpcConn*/
,
tscTmr
,
tsShellActivityTimer
*
1000
);
initialized
=
1
;
tscTrace
(
"client is initialized successfully"
);
...
...
src/inc/taosmsg.h
浏览文件 @
32e7e0e7
...
...
@@ -688,6 +688,7 @@ typedef struct {
typedef
struct
{
int32_t
dnode
;
//the ID of dnode
int32_t
vnode
;
//the index of vnode
uint32_t
ip
;
}
SVPeerDesc
;
typedef
struct
{
...
...
src/inc/trpc.h
浏览文件 @
32e7e0e7
...
...
@@ -21,6 +21,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1
...
...
src/rpc/CMakeLists.txt
浏览文件 @
32e7e0e7
...
...
@@ -23,3 +23,6 @@ ENDIF ()
ADD_LIBRARY
(
trpc
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
trpc tutil
)
ADD_SUBDIRECTORY
(
test
)
src/rpc/inc/thaship.h
浏览文件 @
32e7e0e7
...
...
@@ -16,10 +16,18 @@
#ifndef _rpc_hash_ip_header_
#define _rpc_hash_ip_header_
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
taosOpenIpHash
(
int
maxSessions
);
void
taosCloseIpHash
(
void
*
handle
);
void
*
taosAddIpHash
(
void
*
handle
,
void
*
pData
,
uint32_t
ip
,
uint16_t
port
);
void
taosDeleteIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
);
void
*
taosGetIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
);
#ifdef __cplusplus
}
#endif
#endif
src/rpc/inc/ttcpclient.h
浏览文件 @
32e7e0e7
...
...
@@ -16,6 +16,10 @@
#ifndef _taos_tcp_client_header_
#define _taos_tcp_client_header_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosdef.h"
void
*
taosInitTcpClient
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
);
...
...
@@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16
void
taosCloseTcpClientConnection
(
void
*
chandle
);
int
taosSendTcpClientData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
);
#ifdef __cplusplus
}
#endif
#endif
src/rpc/inc/ttcpserver.h
浏览文件 @
32e7e0e7
...
...
@@ -16,6 +16,10 @@
#ifndef _taos_tcp_server_header_
#define _taos_tcp_server_header_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosdef.h"
void
*
taosInitTcpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
...
...
@@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param);
void
taosCloseTcpServerConnection
(
void
*
param
);
int
taosSendTcpServerData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
);
#ifdef __cplusplus
}
#endif
#endif
src/rpc/inc/tudp.h
浏览文件 @
32e7e0e7
...
...
@@ -16,6 +16,10 @@
#ifndef _taos_udp_header_
#define _taos_udp_header_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosdef.h"
void
*
taosInitUdpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
...
...
@@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd);
void
taosInitMsgHdr
(
void
**
hdr
,
void
*
dest
,
int
maxPkts
);
void
taosSetMsgHdrData
(
void
*
hdr
,
char
*
data
,
int
dataLen
);
#ifdef __cplusplus
}
#endif
#endif
src/rpc/src/trpc.c
浏览文件 @
32e7e0e7
...
...
@@ -30,6 +30,7 @@
#include "lz4.h"
#include "tconncache.h"
#include "trpc.h"
#include "taoserror.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
...
...
src/rpc/test/CMakeLists.txt
0 → 100644
浏览文件 @
32e7e0e7
CMAKE_MINIMUM_REQUIRED
(
VERSION 2.8
)
PROJECT
(
TDengine
)
IF
((
TD_LINUX_64
)
OR
(
TD_LINUX_32 AND TD_ARM
))
INCLUDE_DIRECTORIES
(
${
TD_OS_DIR
}
/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/util/inc
)
INCLUDE_DIRECTORIES
(
../inc
)
AUX_SOURCE_DIRECTORY
(
./ TEST_SRC
)
ADD_EXECUTABLE
(
rpcTest
${
TEST_SRC
}
)
TARGET_LINK_LIBRARIES
(
rpcTest trpc
)
ENDIF
()
src/rpc/test/unittest.c
0 → 100644
浏览文件 @
32e7e0e7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>
int32_t
main
(
int32_t
argc
,
char
*
argv
[])
{
dPrint
(
"unit test for rpc module"
);
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
"0.0.0.0"
;
rpcInit
.
localPort
=
7000
;
rpcInit
.
label
=
"unittest"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
fp
=
NULL
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
connType
=
TAOS_CONN_SOCKET_TYPE_S
();
rpcInit
.
idleTime
=
2000
;
void
*
pConn
=
rpcOpen
(
&
rpcInit
);
if
(
pConn
!=
NULL
)
{
dPrint
(
"conection is opened"
);
}
else
{
dError
(
"failed to initialize rpc"
);
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录