Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
b00ac2f5
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b00ac2f5
编写于
1月 02, 2021
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
share rpc obj
上级
ea28d235
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
120 addition
and
82 deletion
+120
-82
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+9
-9
src/client/src/tscServer.c
src/client/src/tscServer.c
+14
-10
src/client/src/tscSql.c
src/client/src/tscSql.c
+6
-21
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+8
-4
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+0
-1
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+83
-37
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
b00ac2f5
...
@@ -290,6 +290,12 @@ typedef struct {
...
@@ -290,6 +290,12 @@ typedef struct {
struct
SLocalReducer
*
pLocalReducer
;
struct
SLocalReducer
*
pLocalReducer
;
}
SSqlRes
;
}
SSqlRes
;
typedef
struct
{
char
key
[
512
];
SRpcCorEpSet
*
tscCorMgmtEpSet
;
void
*
pDnodeConn
;
}
SRpcObj
;
typedef
struct
STscObj
{
typedef
struct
STscObj
{
void
*
signature
;
void
*
signature
;
void
*
pTimer
;
void
*
pTimer
;
...
@@ -305,9 +311,7 @@ typedef struct STscObj {
...
@@ -305,9 +311,7 @@ typedef struct STscObj {
int64_t
hbrid
;
int64_t
hbrid
;
struct
SSqlObj
*
sqlList
;
struct
SSqlObj
*
sqlList
;
struct
SSqlStream
*
streamList
;
struct
SSqlStream
*
streamList
;
SRpcCorEpSet
*
tscCorMgmtEpSet
;
SRpcObj
*
pRpcObj
;
void
*
pDnodeConn
;
void
*
pRpcObj
;
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
int32_t
numOfObj
;
// number of sqlObj from this tscObj
int32_t
numOfObj
;
// number of sqlObj from this tscObj
}
STscObj
;
}
STscObj
;
...
@@ -378,14 +382,10 @@ typedef struct SSqlStream {
...
@@ -378,14 +382,10 @@ typedef struct SSqlStream {
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
);
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
);
typedef
struct
{
char
key
[
512
];
void
*
pDnodeConn
;
}
SRpcObj
;
void
*
tscAcquireRpc
(
const
char
*
insK
ey
);
void
*
tscAcquireRpc
(
const
char
*
k
ey
);
void
tscReleaseRpc
(
void
*
param
);
void
tscReleaseRpc
(
void
*
param
);
int32_t
tscInitRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secret
,
void
**
pRpcObj
,
void
**
pDnodeConn
);
int32_t
tscInitRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secret
,
void
**
pRpcObj
);
void
tscInitMsgsFp
();
void
tscInitMsgsFp
();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
...
...
src/client/src/tscServer.c
浏览文件 @
b00ac2f5
...
@@ -72,7 +72,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
...
@@ -72,7 +72,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
}
}
static
void
tscDumpMgmtEpSet
(
SSqlObj
*
pSql
)
{
static
void
tscDumpMgmtEpSet
(
SSqlObj
*
pSql
)
{
SRpcCorEpSet
*
pCorEpSet
=
pSql
->
pTscObj
->
tscCorMgmtEpSet
;
SRpcCorEpSet
*
pCorEpSet
=
pSql
->
pTscObj
->
pRpcObj
->
tscCorMgmtEpSet
;
taosCorBeginRead
(
&
pCorEpSet
->
version
);
taosCorBeginRead
(
&
pCorEpSet
->
version
);
pSql
->
epSet
=
pCorEpSet
->
epSet
;
pSql
->
epSet
=
pCorEpSet
->
epSet
;
taosCorEndRead
(
&
pCorEpSet
->
version
);
taosCorEndRead
(
&
pCorEpSet
->
version
);
...
@@ -95,7 +95,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
...
@@ -95,7 +95,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
}
}
void
tscUpdateMgmtEpSet
(
SSqlObj
*
pSql
,
SRpcEpSet
*
pEpSet
)
{
void
tscUpdateMgmtEpSet
(
SSqlObj
*
pSql
,
SRpcEpSet
*
pEpSet
)
{
// no need to update if equal
// no need to update if equal
SRpcCorEpSet
*
pCorEpSet
=
pSql
->
pTscObj
->
tscCorMgmtEpSet
;
SRpcCorEpSet
*
pCorEpSet
=
pSql
->
pTscObj
->
pRpcObj
->
tscCorMgmtEpSet
;
taosCorBeginWrite
(
&
pCorEpSet
->
version
);
taosCorBeginWrite
(
&
pCorEpSet
->
version
);
pCorEpSet
->
epSet
=
*
pEpSet
;
pCorEpSet
->
epSet
=
*
pEpSet
;
taosCorEndWrite
(
&
pCorEpSet
->
version
);
taosCorEndWrite
(
&
pCorEpSet
->
version
);
...
@@ -151,14 +151,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...
@@ -151,14 +151,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
if
(
epSet
->
numOfEps
>
0
)
{
if
(
epSet
->
numOfEps
>
0
)
{
tscEpSetHtons
(
epSet
);
tscEpSetHtons
(
epSet
);
if
(
!
tscEpSetIsEqual
(
&
pSql
->
pTscObj
->
tscCorMgmtEpSet
->
epSet
,
epSet
))
{
tscTrace
(
"%p updating epset: numOfEps: %d, inUse: %d"
,
pSql
,
epSet
->
numOfEps
,
epSet
->
inUse
);
//SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet;
for
(
int8_t
i
=
0
;
i
<
epSet
->
numOfEps
;
i
++
)
{
//if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) {
tscTrace
(
"endpoint %d: fqdn=%s, port=%d"
,
i
,
epSet
->
fqdn
[
i
],
epSet
->
port
[
i
]);
// tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse);
}
// for (int8_t i = 0; i < epSet->numOfEps; i++) {
// tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]);
// }
//}
//concurrency problem, update mgmt epset anyway
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
}
}
pSql
->
pTscObj
->
connId
=
htonl
(
pRsp
->
connId
);
pSql
->
pTscObj
->
connId
=
htonl
(
pRsp
->
connId
);
...
@@ -264,7 +267,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
...
@@ -264,7 +267,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
code
=
0
.
code
=
0
};
};
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
rpcSendRequest
(
pObj
->
pRpcObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
src/client/src/tscSql.c
浏览文件 @
b00ac2f5
...
@@ -94,8 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
...
@@ -94,8 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
sprintf
(
rpcKey
,
"%s:%s:%s:%d"
,
user
,
pass
,
ip
,
port
);
sprintf
(
rpcKey
,
"%s:%s:%s:%d"
,
user
,
pass
,
ip
,
port
);
void
*
pRpcObj
=
NULL
;
void
*
pRpcObj
=
NULL
;
void
*
pDnodeConn
=
NULL
;
if
(
tscInitRpc
(
rpcKey
,
user
,
secretEncrypt
,
&
pRpcObj
)
!=
0
)
{
if
(
tscInitRpc
(
rpcKey
,
user
,
secretEncrypt
,
&
pRpcObj
,
&
pDnodeConn
)
!=
0
)
{
terrno
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
terrno
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
NULL
;
return
NULL
;
}
}
...
@@ -106,20 +105,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
...
@@ -106,20 +105,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
tscReleaseRpc
(
pRpcObj
);
tscReleaseRpc
(
pRpcObj
);
return
NULL
;
return
NULL
;
}
}
// set up tscObj's mgmtEpSet
pObj
->
tscCorMgmtEpSet
=
(
SRpcCorEpSet
*
)
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
NULL
==
pObj
->
tscCorMgmtEpSet
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
}
memcpy
(
pObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
SRpcCorEpSet
));
pObj
->
signature
=
pObj
;
pObj
->
signature
=
pObj
;
pObj
->
pRpcObj
=
pRpcObj
;
pObj
->
pRpcObj
=
(
SRpcObj
*
)
pRpcObj
;
pObj
->
pDnodeConn
=
pDnodeConn
;
memcpy
(
pObj
->
pRpcObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
SRpcCorEpSet
));
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
secretEncryptLen
=
MIN
(
secretEncryptLen
,
sizeof
(
pObj
->
pass
));
secretEncryptLen
=
MIN
(
secretEncryptLen
,
sizeof
(
pObj
->
pass
));
memcpy
(
pObj
->
pass
,
secretEncrypt
,
secretEncryptLen
);
memcpy
(
pObj
->
pass
,
secretEncrypt
,
secretEncryptLen
);
...
@@ -130,7 +118,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
...
@@ -130,7 +118,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if
(
len
>=
TSDB_DB_NAME_LEN
)
{
if
(
len
>=
TSDB_DB_NAME_LEN
)
{
terrno
=
TSDB_CODE_TSC_INVALID_DB_LENGTH
;
terrno
=
TSDB_CODE_TSC_INVALID_DB_LENGTH
;
tscReleaseRpc
(
pRpcObj
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
free
(
pObj
);
return
NULL
;
return
NULL
;
}
}
...
@@ -148,7 +135,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
...
@@ -148,7 +135,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if
(
NULL
==
pSql
)
{
if
(
NULL
==
pSql
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscReleaseRpc
(
pRpcObj
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
free
(
pObj
);
return
NULL
;
return
NULL
;
}
}
...
@@ -166,7 +152,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
...
@@ -166,7 +152,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscReleaseRpc
(
pRpcObj
);
tscReleaseRpc
(
pRpcObj
);
free
(
pSql
);
free
(
pSql
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
free
(
pObj
);
return
NULL
;
return
NULL
;
}
}
...
@@ -205,7 +190,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
...
@@ -205,7 +190,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return
NULL
;
return
NULL
;
}
}
tscDebug
(
"%p DB connection is opening,
dnodeConn:%p"
,
pObj
,
p
Obj
->
pDnodeConn
);
tscDebug
(
"%p DB connection is opening,
rpcObj: %p, dnodeConn:%p"
,
pObj
,
pObj
->
pRpcObj
,
pObj
->
pRpc
Obj
->
pDnodeConn
);
taos_free_result
(
pSql
);
taos_free_result
(
pSql
);
// version compare only requires the first 3 segments of the version string
// version compare only requires the first 3 segments of the version string
...
@@ -282,7 +267,7 @@ void taos_close(TAOS *taos) {
...
@@ -282,7 +267,7 @@ void taos_close(TAOS *taos) {
return
;
return
;
}
}
tscDebug
(
"%p try to free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p try to free tscObj
"
,
pObj
);
if
(
pObj
->
signature
!=
pObj
)
{
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"%p already closed or invalid tscObj"
,
pObj
);
tscDebug
(
"%p already closed or invalid tscObj"
,
pObj
);
return
;
return
;
...
@@ -302,7 +287,7 @@ void taos_close(TAOS *taos) {
...
@@ -302,7 +287,7 @@ void taos_close(TAOS *taos) {
}
}
}
}
tscDebug
(
"%p all sqlObj are freed, free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p all sqlObj are freed, free tscObj
"
,
pObj
);
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
}
}
...
...
src/client/src/tscSystem.c
浏览文件 @
b00ac2f5
...
@@ -53,6 +53,7 @@ void tscFreeRpcObj(void *param) {
...
@@ -53,6 +53,7 @@ void tscFreeRpcObj(void *param) {
assert
(
param
);
assert
(
param
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)(
param
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)(
param
);
rpcClose
(
pRpcObj
->
pDnodeConn
);
rpcClose
(
pRpcObj
->
pDnodeConn
);
tfree
(
pRpcObj
->
tscCorMgmtEpSet
);
}
}
void
*
tscAcquireRpc
(
const
char
*
key
)
{
void
*
tscAcquireRpc
(
const
char
*
key
)
{
SRpcObj
*
pRpcObj
=
taosCacheAcquireByKey
(
tscRpcCache
,
key
,
strlen
(
key
));
SRpcObj
*
pRpcObj
=
taosCacheAcquireByKey
(
tscRpcCache
,
key
,
strlen
(
key
));
...
@@ -71,13 +72,12 @@ void tscReleaseRpc(void *param) {
...
@@ -71,13 +72,12 @@ void tscReleaseRpc(void *param) {
pthread_mutex_unlock
(
&
rpcObjMutex
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
}
}
int32_t
tscInitRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
ppRpcObj
,
void
**
pDnodeConn
)
{
int32_t
tscInitRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
ppRpcObj
)
{
pthread_mutex_lock
(
&
rpcObjMutex
);
pthread_mutex_lock
(
&
rpcObjMutex
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)
tscAcquireRpc
(
key
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)
tscAcquireRpc
(
key
);
if
(
pRpcObj
!=
NULL
)
{
if
(
pRpcObj
!=
NULL
)
{
*
ppRpcObj
=
pRpcObj
;
*
ppRpcObj
=
pRpcObj
;
*
pDnodeConn
=
pRpcObj
->
pDnodeConn
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
return
0
;
}
}
...
@@ -86,7 +86,7 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt,
...
@@ -86,7 +86,7 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt,
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
numOfThreads
=
tscNumOfThreads
*
2
;
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
...
@@ -111,9 +111,13 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt,
...
@@ -111,9 +111,13 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt,
pthread_mutex_unlock
(
&
rpcObjMutex
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
-
1
;
return
-
1
;
}
}
pRpcObj
->
tscCorMgmtEpSet
=
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
pRpcObj
->
tscCorMgmtEpSet
==
NULL
)
{
rpcClose
(
rpcObj
.
pDnodeConn
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
}
*
ppRpcObj
=
pRpcObj
;
*
ppRpcObj
=
pRpcObj
;
*
pDnodeConn
=
pRpcObj
->
pDnodeConn
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
return
0
;
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
b00ac2f5
...
@@ -896,7 +896,6 @@ void tscCloseTscObj(void *param) {
...
@@ -896,7 +896,6 @@ void tscCloseTscObj(void *param) {
taosTmrStopA
(
&
(
pObj
->
pTimer
));
taosTmrStopA
(
&
(
pObj
->
pTimer
));
tscReleaseRpc
(
pObj
->
pRpcObj
);
tscReleaseRpc
(
pObj
->
pRpcObj
);
tfree
(
pObj
->
tscCorMgmtEpSet
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
tfree
(
pObj
);
tfree
(
pObj
);
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
b00ac2f5
...
@@ -55,6 +55,13 @@ typedef struct SThreadObj {
...
@@ -55,6 +55,13 @@ typedef struct SThreadObj {
void
*
(
*
processData
)(
SRecvInfo
*
pPacket
);
void
*
(
*
processData
)(
SRecvInfo
*
pPacket
);
}
SThreadObj
;
}
SThreadObj
;
typedef
struct
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
}
SClientObj
;
typedef
struct
{
typedef
struct
{
SOCKET
fd
;
SOCKET
fd
;
uint32_t
ip
;
uint32_t
ip
;
...
@@ -121,6 +128,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
...
@@ -121,6 +128,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj
->
processData
=
fp
;
pThreadObj
->
processData
=
fp
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
stop
=
false
;
}
}
// initialize mutex, thread, fd which may fail
// initialize mutex, thread, fd which may fail
...
@@ -171,6 +179,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
...
@@ -171,6 +179,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
}
static
void
taosStopTcpThread
(
SThreadObj
*
pThreadObj
)
{
static
void
taosStopTcpThread
(
SThreadObj
*
pThreadObj
)
{
if
(
pThreadObj
==
NULL
)
{
return
;}
// save thread into local variable and signal thread to stop
// save thread into local variable and signal thread to stop
pthread_t
thread
=
pThreadObj
->
thread
;
pthread_t
thread
=
pThreadObj
->
thread
;
if
(
!
taosCheckPthreadValid
(
thread
))
{
if
(
!
taosCheckPthreadValid
(
thread
))
{
...
@@ -275,48 +284,77 @@ static void *taosAcceptTcpConnection(void *arg) {
...
@@ -275,48 +284,77 @@ static void *taosAcceptTcpConnection(void *arg) {
return
NULL
;
return
NULL
;
}
}
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
)
{
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SThreadObj
*
pThreadObj
;
SClientObj
*
pClientObj
=
(
SClientObj
*
)
calloc
(
1
,
sizeof
(
SClientObj
));
pthread_attr_t
thattr
;
if
(
pClientObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
));
memset
(
pThreadObj
,
0
,
sizeof
(
SThreadObj
));
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
ip
=
ip
;
pThreadObj
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP client mutex(%s)"
,
label
,
strerror
(
errno
));
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
return
NULL
;
}
}
pThreadObj
->
pollFd
=
(
SOCKET
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tstrncpy
(
pClientObj
->
label
,
label
,
sizeof
(
pClientObj
->
label
));
tError
(
"%s failed to create TCP client epoll"
,
label
);
pClientObj
->
numOfThreads
=
numOfThreads
;
free
(
pThreadObj
);
pClientObj
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
numOfThreads
,
sizeof
(
SThreadObj
*
));
if
(
pClientObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
tfree
(
pClientObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
}
pThreadObj
->
processData
=
fp
;
int
code
=
0
;
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
int
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
pthread_attr_destroy
(
&
thattr
);
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
if
(
code
!=
0
)
{
SThreadObj
*
pThreadObj
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
taosCloseSocket
(
pThreadObj
->
pollFd
);
if
(
pThreadObj
==
NULL
)
{
free
(
pThreadObj
);
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP read data thread(%s)"
,
label
,
strerror
(
errno
));
for
(
int
j
=
0
;
j
<
i
;
++
j
)
free
(
pClientObj
->
pThreadObj
[
j
]);
free
(
pClientObj
);
pthread_attr_destroy
(
&
thattr
);
return
NULL
;
return
NULL
;
}
}
pClientObj
->
pThreadObj
[
i
]
=
pThreadObj
;
taosResetPthread
(
&
pThreadObj
->
thread
);
pThreadObj
->
ip
=
ip
;
pThreadObj
->
stop
=
false
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
processData
=
fp
;
}
// initialize mutex, thread, fd which may fail
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
];
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
if
(
code
<
0
)
{
tError
(
"%s failed to init TCP process data mutex(%s)"
,
label
,
strerror
(
errno
));
break
;
}
tDebug
(
"%s TCP client is initialized, ip:%u:%hu"
,
label
,
ip
,
port
);
pThreadObj
->
pollFd
=
(
int64_t
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
code
=
-
1
;
break
;
}
return
pThreadObj
;
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
if
(
code
!=
0
)
{
tError
(
"%s failed to create TCP process data thread(%s)"
,
label
,
strerror
(
errno
));
break
;
}
pThreadObj
->
threadId
=
i
;
}
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosCleanUpTcpClient
(
pClientObj
);
pClientObj
=
NULL
;
}
return
pClientObj
;
}
}
void
taosStopTcpClient
(
void
*
chandle
)
{
void
taosStopTcpClient
(
void
*
chandle
)
{
...
@@ -327,15 +365,23 @@ void taosStopTcpClient(void *chandle) {
...
@@ -327,15 +365,23 @@ void taosStopTcpClient(void *chandle) {
}
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
S
ThreadObj
*
pThread
Obj
=
chandle
;
S
ClientObj
*
pClient
Obj
=
chandle
;
if
(
p
Thread
Obj
==
NULL
)
return
;
if
(
p
Client
Obj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pClientObj
->
numOfThreads
;
++
i
)
{
tDebug
(
"%s TCP client will be cleaned up"
,
pThreadObj
->
label
)
;
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
]
;
taosStopTcpThread
(
pThreadObj
);
taosStopTcpThread
(
pThreadObj
);
}
tDebug
(
"%s TCP client is cleaned up"
,
pClientObj
->
label
);
tfree
(
pClientObj
->
pThreadObj
);
tfree
(
pClientObj
);
}
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
SThreadObj
*
pThreadObj
=
shandle
;
SClientObj
*
pClientObj
=
shandle
;
int32_t
index
=
atomic_load_32
(
&
pClientObj
->
index
)
%
pClientObj
->
numOfThreads
;
atomic_store_32
(
&
pClientObj
->
index
,
index
+
1
);
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
index
];
SOCKET
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pThreadObj
->
ip
);
SOCKET
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pThreadObj
->
ip
);
if
(
fd
<
0
)
return
NULL
;
if
(
fd
<
0
)
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录