Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
2ab56e4f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
2ab56e4f
编写于
1月 28, 2021
作者:
H
haojun Liao
提交者:
GitHub
1月 28, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5060 from taosdata/feature/share_rpc1
Feature/share rpc1
上级
f3bcb2f3
236c4654
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
195 addition
and
107 deletion
+195
-107
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+11
-4
src/client/src/tscServer.c
src/client/src/tscServer.c
+12
-8
src/client/src/tscSql.c
src/client/src/tscSql.c
+18
-21
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+71
-28
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+1
-8
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+82
-38
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
2ab56e4f
...
...
@@ -297,6 +297,11 @@ typedef struct {
struct
SLocalMerger
*
pLocalMerger
;
}
SSqlRes
;
typedef
struct
{
char
key
[
512
];
void
*
pDnodeConn
;
}
SRpcObj
;
typedef
struct
STscObj
{
void
*
signature
;
void
*
pTimer
;
...
...
@@ -312,8 +317,8 @@ typedef struct STscObj {
int64_t
hbrid
;
struct
SSqlObj
*
sqlList
;
struct
SSqlStream
*
streamList
;
SRpc
CorEpSet
*
tscCorMgmtEpSet
;
void
*
pDnodeConn
;
SRpc
Obj
*
pRpcObj
;
SRpcCorEpSet
*
tscCorMgmtEpSet
;
pthread_mutex_t
mutex
;
int32_t
numOfObj
;
// number of sqlObj from this tscObj
}
STscObj
;
...
...
@@ -390,8 +395,10 @@ typedef struct SSqlStream {
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
);
int32_t
tscInitRpc
(
const
char
*
user
,
const
char
*
secret
,
void
**
pDnodeConn
);
void
tscInitMsgsFp
();
int
tscAcquireRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secret
,
void
**
pRpcObj
);
void
tscReleaseRpc
(
void
*
param
);
void
tscInitMsgsFp
();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
...
...
src/client/src/tscServer.c
浏览文件 @
2ab56e4f
...
...
@@ -157,13 +157,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
if
(
epSet
->
numOfEps
>
0
)
{
tscEpSetHtons
(
epSet
);
if
(
!
tscEpSetIsEqual
(
&
pSql
->
pTscObj
->
tscCorMgmtEpSet
->
epSet
,
epSet
))
{
tscTrace
(
"%p updating epset: numOfEps: %d, inUse: %d"
,
pSql
,
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int8_t
i
=
0
;
i
<
epSet
->
numOfEps
;
i
++
)
{
tscTrace
(
"endpoint %d: fqdn=%s, port=%d"
,
i
,
epSet
->
fqdn
[
i
],
epSet
->
port
[
i
]);
}
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
//SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
//if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) {
// tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse);
// for (int8_t i = 0; i < epSet->numOfEps; i++) {
// tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]);
// }
//}
//concurrency problem, update mgmt epset anyway
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
pSql
->
pTscObj
->
connId
=
htonl
(
pRsp
->
connId
);
...
...
@@ -270,7 +273,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
code
=
0
};
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
rpcSendRequest
(
pObj
->
pRpcObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
2ab56e4f
...
...
@@ -90,9 +90,11 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
}
else
{
if
(
tscSetMgmtEpSetFromCfg
(
tsFirst
,
tsSecond
,
&
corMgmtEpSet
)
<
0
)
return
NULL
;
}
char
rpcKey
[
512
]
=
{
0
};
snprintf
(
rpcKey
,
sizeof
(
rpcKey
),
"%s:%s:%s:%d"
,
user
,
pass
,
ip
,
port
);
void
*
p
DnodeConn
=
NULL
;
if
(
tsc
InitRpc
(
user
,
secretEncrypt
,
&
pDnodeConn
)
!=
0
)
{
void
*
p
RpcObj
=
NULL
;
if
(
tsc
AcquireRpc
(
rpcKey
,
user
,
secretEncrypt
,
&
pRpcObj
)
!=
0
)
{
terrno
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
NULL
;
}
...
...
@@ -100,23 +102,21 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
STscObj
*
pObj
=
(
STscObj
*
)
calloc
(
1
,
sizeof
(
STscObj
));
if
(
NULL
==
pObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
tscReleaseRpc
(
pRpcObj
);
return
NULL
;
}
// set up tscObj's mgmtEpSet
pObj
->
tscCorMgmtEpSet
=
(
SRpcCorEpSet
*
)
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
NULL
==
pObj
->
tscCorMgmtEpSet
)
{
pObj
->
tscCorMgmtEpSet
=
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
pObj
->
tscCorMgmtEpSet
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
memcpy
(
pObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
SRpcCorEpSet
));
pObj
->
signature
=
pObj
;
pObj
->
pDnodeConn
=
pDnodeConn
;
memcpy
(
pObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
corMgmtEpSet
));
pObj
->
signature
=
pObj
;
pObj
->
pRpcObj
=
(
SRpcObj
*
)
pRpcObj
;
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
secretEncryptLen
=
MIN
(
secretEncryptLen
,
sizeof
(
pObj
->
pass
));
memcpy
(
pObj
->
pass
,
secretEncrypt
,
secretEncryptLen
);
...
...
@@ -126,8 +126,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
/* db name is too long */
if
(
len
>=
TSDB_DB_NAME_LEN
)
{
terrno
=
TSDB_CODE_TSC_INVALID_DB_LENGTH
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -144,8 +143,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
NULL
==
pSql
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -161,9 +159,8 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
tscReleaseRpc
(
pRpcObj
);
free
(
pSql
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -202,7 +199,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return
NULL
;
}
tscDebug
(
"%p DB connection is opening,
dnodeConn:%p"
,
pObj
,
p
Obj
->
pDnodeConn
);
tscDebug
(
"%p DB connection is opening,
rpcObj: %p, dnodeConn:%p"
,
pObj
,
pObj
->
pRpcObj
,
pObj
->
pRpc
Obj
->
pDnodeConn
);
taos_free_result
(
pSql
);
// version compare only requires the first 3 segments of the version string
...
...
@@ -279,7 +276,7 @@ void taos_close(TAOS *taos) {
return
;
}
tscDebug
(
"%p try to free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p try to free tscObj
"
,
pObj
);
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"%p already closed or invalid tscObj"
,
pObj
);
return
;
...
...
@@ -303,7 +300,7 @@ void taos_close(TAOS *taos) {
}
}
tscDebug
(
"%p all sqlObj are freed, free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p all sqlObj are freed, free tscObj
"
,
pObj
);
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
}
...
...
src/client/src/tscSystem.c
浏览文件 @
2ab56e4f
...
...
@@ -43,41 +43,74 @@ void *tscTmr;
void
*
tscQhandle
;
int32_t
tscRefId
=
-
1
;
int32_t
tscNumOfObj
=
0
;
// number of sqlObj in current process.
static
void
*
tscCheckDiskUsageTmr
;
void
*
tscRpcCache
;
// cache to keep rpc obj
int32_t
tscNumOfThreads
=
1
;
// num of rpc threads
static
pthread_mutex_t
rpcObjMutex
;
// mutex to protect open the rpc obj concurrently
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
void
tscCheckDiskUsage
(
void
*
UNUSED_PARAM
(
para
),
void
*
UNUSED_PARAM
(
param
))
{
taosGetDisk
();
taosTmrReset
(
tscCheckDiskUsage
,
1000
,
NULL
,
tscTmr
,
&
tscCheckDiskUsageTmr
);
}
void
tscFreeRpcObj
(
void
*
param
)
{
assert
(
param
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)(
param
);
tscDebug
(
"free rpcObj:%p and free pDnodeConn: %p"
,
pRpcObj
,
pRpcObj
->
pDnodeConn
);
rpcClose
(
pRpcObj
->
pDnodeConn
);
}
int32_t
tscInitRpc
(
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
pDnodeConn
)
{
SRpcInit
rpcInit
;
if
(
*
pDnodeConn
==
NULL
)
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
1
;
// every DB connection has only one thread
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
idleTime
=
2000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
*
pDnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to TDengine"
);
return
-
1
;
}
else
{
tscDebug
(
"dnodeConn:%p is created, user:%s"
,
*
pDnodeConn
,
user
);
}
void
tscReleaseRpc
(
void
*
param
)
{
if
(
param
==
NULL
)
{
return
;
}
pthread_mutex_lock
(
&
rpcObjMutex
);
taosCacheRelease
(
tscRpcCache
,
(
void
*
)
&
param
,
false
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
}
int32_t
tscAcquireRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
ppRpcObj
)
{
pthread_mutex_lock
(
&
rpcObjMutex
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)
taosCacheAcquireByKey
(
tscRpcCache
,
key
,
strlen
(
key
));
if
(
pRpcObj
!=
NULL
)
{
*
ppRpcObj
=
pRpcObj
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
}
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
tscNumOfThreads
*
2
;
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
idleTime
=
2000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
SRpcObj
rpcObj
;
memset
(
&
rpcObj
,
0
,
sizeof
(
rpcObj
));
strncpy
(
rpcObj
.
key
,
key
,
strlen
(
key
));
rpcObj
.
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
rpcObj
.
pDnodeConn
==
NULL
)
{
pthread_mutex_unlock
(
&
rpcObjMutex
);
tscError
(
"failed to init connection to TDengine"
);
return
-
1
;
}
pRpcObj
=
taosCachePut
(
tscRpcCache
,
rpcObj
.
key
,
strlen
(
rpcObj
.
key
),
&
rpcObj
,
sizeof
(
rpcObj
),
1000
*
10
);
if
(
pRpcObj
==
NULL
)
{
rpcClose
(
rpcObj
.
pDnodeConn
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
-
1
;
}
*
ppRpcObj
=
pRpcObj
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
}
...
...
@@ -118,7 +151,7 @@ void taos_init_imp(void) {
int
queueSize
=
tsMaxConnections
*
2
;
double
factor
=
(
tscEmbedded
==
0
)
?
2
.
0
:
4
.
0
;
int32_t
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
if
(
tscNumOfThreads
<
2
)
{
tscNumOfThreads
=
2
;
}
...
...
@@ -140,6 +173,10 @@ void taos_init_imp(void) {
tscTableMetaInfo
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tscDebug
(
"TableMeta:%p"
,
tscTableMetaInfo
);
}
int
refreshTime
=
5
;
tscRpcCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
true
,
tscFreeRpcObj
,
"rpcObj"
);
pthread_mutex_init
(
&
rpcObjMutex
,
NULL
);
tscRefId
=
taosOpenRef
(
200
,
tscCloseTscObj
);
...
...
@@ -181,10 +218,16 @@ void taos_cleanup(void) {
taosCleanupKeywordsTable
();
taosCloseLog
();
if
(
tscEmbedded
==
0
)
{
rpcCleanup
();
p
=
tscRpcCache
;
tscRpcCache
=
NULL
;
if
(
p
!=
NULL
)
{
taosCacheCleanup
(
p
);
pthread_mutex_destroy
(
&
rpcObjMutex
);
}
if
(
tscEmbedded
==
0
)
rpcCleanup
();
p
=
tscTmr
;
tscTmr
=
NULL
;
taosTmrCleanUp
(
p
);
...
...
src/client/src/tscUtil.c
浏览文件 @
2ab56e4f
...
...
@@ -447,7 +447,6 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj
*
p
=
*
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
p
->
pTscObj
;
assert
(
RID_VALID
(
p
->
self
));
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfObj
,
1
);
...
...
@@ -898,16 +897,10 @@ void tscCloseTscObj(void *param) {
pObj
->
signature
=
NULL
;
taosTmrStopA
(
&
(
pObj
->
pTimer
));
void
*
p
=
pObj
->
pDnodeConn
;
if
(
pObj
->
pDnodeConn
!=
NULL
)
{
rpcClose
(
pObj
->
pDnodeConn
);
pObj
->
pDnodeConn
=
NULL
;
}
tfree
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pObj
->
pRpcObj
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
tscDebug
(
"%p DB connection is closed, dnodeConn:%p"
,
pObj
,
p
);
tfree
(
pObj
);
}
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
2ab56e4f
...
...
@@ -48,6 +48,13 @@ typedef struct SThreadObj {
void
*
(
*
processData
)(
SRecvInfo
*
pPacket
);
}
SThreadObj
;
typedef
struct
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
}
SClientObj
;
typedef
struct
{
SOCKET
fd
;
uint32_t
ip
;
...
...
@@ -116,6 +123,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj
->
processData
=
fp
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
stop
=
false
;
}
// initialize mutex, thread, fd which may fail
...
...
@@ -166,6 +174,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
static
void
taosStopTcpThread
(
SThreadObj
*
pThreadObj
)
{
if
(
pThreadObj
==
NULL
)
{
return
;}
// save thread into local variable and signal thread to stop
pthread_t
thread
=
pThreadObj
->
thread
;
if
(
!
taosCheckPthreadValid
(
thread
))
{
...
...
@@ -282,49 +291,76 @@ static void *taosAcceptTcpConnection(void *arg) {
return
NULL
;
}
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
)
{
SThreadObj
*
pThreadObj
;
pthread_attr_t
thattr
;
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
));
memset
(
pThreadObj
,
0
,
sizeof
(
SThreadObj
));
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
ip
=
ip
;
pThreadObj
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP client mutex(%s)"
,
label
,
strerror
(
errno
));
free
(
pThreadObj
);
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
pClientObj
=
(
SClientObj
*
)
calloc
(
1
,
sizeof
(
SClientObj
));
if
(
pClientObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
}
pThreadObj
->
pollFd
=
(
EpollFd
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP client epoll"
,
label
);
free
(
pThreadObj
);
tstrncpy
(
pClientObj
->
label
,
label
,
sizeof
(
pClientObj
->
label
));
pClientObj
->
numOfThreads
=
numOfThreads
;
pClientObj
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
numOfThreads
,
sizeof
(
SThreadObj
*
));
if
(
pClientObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
tfree
(
pClientObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pThreadObj
->
processData
=
fp
;
int
code
=
0
;
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
int
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
EpollClose
(
pThreadObj
->
pollFd
);
pThreadObj
->
pollFd
=
-
1
;
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP read data thread(%s)"
,
label
,
strerror
(
errno
));
return
NULL
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
if
(
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
for
(
int
j
=
0
;
j
<
i
;
++
j
)
free
(
pClientObj
->
pThreadObj
[
j
]);
free
(
pClientObj
);
pthread_attr_destroy
(
&
thattr
);
return
NULL
;
}
pClientObj
->
pThreadObj
[
i
]
=
pThreadObj
;
taosResetPthread
(
&
pThreadObj
->
thread
);
pThreadObj
->
ip
=
ip
;
pThreadObj
->
stop
=
false
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
processData
=
fp
;
}
tDebug
(
"%s TCP client is initialized, ip:%u:%hu"
,
label
,
ip
,
port
);
// initialize mutex, thread, fd which may fail
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
];
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
if
(
code
<
0
)
{
tError
(
"%s failed to init TCP process data mutex(%s)"
,
label
,
strerror
(
errno
));
break
;
}
return
pThreadObj
;
pThreadObj
->
pollFd
=
(
int64_t
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
code
=
-
1
;
break
;
}
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
if
(
code
!=
0
)
{
tError
(
"%s failed to create TCP process data thread(%s)"
,
label
,
strerror
(
errno
));
break
;
}
pThreadObj
->
threadId
=
i
;
}
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosCleanUpTcpClient
(
pClientObj
);
pClientObj
=
NULL
;
}
return
pClientObj
;
}
void
taosStopTcpClient
(
void
*
chandle
)
{
...
...
@@ -335,15 +371,23 @@ void taosStopTcpClient(void *chandle) {
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
tDebug
(
"%s TCP client will be cleaned up"
,
pThreadObj
->
label
);
taosStopTcpThread
(
pThreadObj
);
SClientObj
*
pClientObj
=
chandle
;
if
(
pClientObj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pClientObj
->
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
];
taosStopTcpThread
(
pThreadObj
);
}
tDebug
(
"%s TCP client is cleaned up"
,
pClientObj
->
label
);
tfree
(
pClientObj
->
pThreadObj
);
tfree
(
pClientObj
);
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
SThreadObj
*
pThreadObj
=
shandle
;
SClientObj
*
pClientObj
=
shandle
;
int32_t
index
=
atomic_load_32
(
&
pClientObj
->
index
)
%
pClientObj
->
numOfThreads
;
atomic_store_32
(
&
pClientObj
->
index
,
index
+
1
);
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
index
];
SOCKET
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pThreadObj
->
ip
);
if
(
fd
<=
0
)
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录