Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
19f161dd
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
19f161dd
编写于
1月 07, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 07, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9663 from taosdata/feature/tq
refine heartbeat framework
上级
61136183
7583b45e
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
152 addition
and
128 deletion
+152
-128
include/common/tmsg.h
include/common/tmsg.h
+74
-6
source/client/inc/clientHb.h
source/client/inc/clientHb.h
+11
-106
source/client/src/clientHb.c
source/client/src/clientHb.c
+32
-15
source/common/src/tmsg.c
source/common/src/tmsg.c
+35
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
19f161dd
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "tarray.h"
#include "tcoding.h"
#include "tdataformat.h"
#include "thash.h"
#include "tlist.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
...
...
@@ -132,6 +133,73 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef
struct
SKlv
{
int32_t
keyLen
;
int32_t
valueLen
;
void
*
key
;
void
*
value
;
}
SKlv
;
static
FORCE_INLINE
int
taosEncodeSKlv
(
void
**
buf
,
const
SKlv
*
pKlv
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
valueLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
key
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
value
,
pKlv
->
valueLen
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSKlv
(
void
*
buf
,
SKlv
*
pKlv
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
keyLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
valueLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
key
,
pKlv
->
keyLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
value
,
pKlv
->
valueLen
);
return
buf
;
}
typedef
struct
SClientHbKey
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbKey
;
static
FORCE_INLINE
int
taosEncodeSClientHbKey
(
void
**
buf
,
const
SClientHbKey
*
pKey
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
connId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
hbType
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSClientHbKey
(
void
*
buf
,
SClientHbKey
*
pKey
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
connId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
hbType
);
return
buf
;
}
typedef
struct
SClientHbReq
{
SClientHbKey
connKey
;
SHashObj
*
info
;
// hash<Slv.key, Sklv>
}
SClientHbReq
;
typedef
struct
SClientHbBatchReq
{
int64_t
reqId
;
SArray
*
reqs
;
// SArray<SClientHbReq>
}
SClientHbBatchReq
;
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
);
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
);
typedef
struct
SClientHbRsp
{
SClientHbKey
connKey
;
int32_t
status
;
int32_t
bodyLen
;
void
*
body
;
}
SClientHbRsp
;
typedef
struct
SClientHbBatchRsp
{
int64_t
reqId
;
int64_t
rspId
;
SArray
*
rsps
;
// SArray<SClientHbRsp>
}
SClientHbBatchRsp
;
typedef
struct
SBuildTableMetaInput
{
int32_t
vgId
;
char
*
dbName
;
...
...
@@ -1123,7 +1191,7 @@ typedef struct {
int32_t
topicNum
;
int64_t
consumerId
;
char
*
consumerGroup
;
SArray
*
topicNames
;
// SArray<char*>
SArray
*
topicNames
;
// SArray<char*>
}
SCMSubscribeReq
;
static
FORCE_INLINE
int
tSerializeSCMSubscribeReq
(
void
**
buf
,
const
SCMSubscribeReq
*
pReq
)
{
...
...
@@ -1132,7 +1200,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
consumerGroup
);
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
taosArrayGetP
(
pReq
->
topicNames
,
i
));
}
return
tlen
;
...
...
@@ -1143,7 +1211,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
consumerGroup
);
pReq
->
topicNames
=
taosArrayInit
(
pReq
->
topicNum
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
char
*
name
=
NULL
;
buf
=
taosDecodeString
(
buf
,
&
name
);
taosArrayPush
(
pReq
->
topicNames
,
&
name
);
...
...
@@ -1158,14 +1226,14 @@ typedef struct SMqSubTopic {
}
SMqSubTopic
;
typedef
struct
{
int32_t
topicNum
;
int32_t
topicNum
;
SMqSubTopic
topics
[];
}
SCMSubscribeRsp
;
static
FORCE_INLINE
int
tSerializeSCMSubscribeRsp
(
void
**
buf
,
const
SCMSubscribeRsp
*
pRsp
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
topicNum
);
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
topics
[
i
].
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
topics
[
i
].
topicId
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pRsp
->
topics
[
i
].
epSet
);
...
...
@@ -1175,7 +1243,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe
static
FORCE_INLINE
void
*
tDeserializeSCMSubscribeRsp
(
void
*
buf
,
SCMSubscribeRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
topicNum
);
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
topics
[
i
].
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
topics
[
i
].
topicId
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pRsp
->
topics
[
i
].
epSet
);
...
...
source/client/inc/clientHb.h
浏览文件 @
19f161dd
...
...
@@ -20,118 +20,21 @@
typedef
enum
{
mq
=
0
,
// type can be added here
//
HEARTBEAT_TYPE_MAX
}
EHbType
;
typedef
struct
SKlv
{
int32_t
keyLen
;
int32_t
valueLen
;
void
*
key
;
void
*
value
;
}
SKlv
;
static
FORCE_INLINE
int
taosEncodeSKlv
(
void
**
buf
,
const
SKlv
*
pKlv
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
valueLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
key
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
value
,
pKlv
->
valueLen
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSKlv
(
void
*
buf
,
SKlv
*
pKlv
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
keyLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
valueLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
key
,
pKlv
->
keyLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
value
,
pKlv
->
valueLen
);
return
buf
;
}
typedef
struct
SClientHbKey
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbKey
;
static
FORCE_INLINE
int
taosEncodeSClientHbKey
(
void
**
buf
,
const
SClientHbKey
*
pKey
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
connId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
hbType
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSClientHbKey
(
void
*
buf
,
SClientHbKey
*
pKey
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
connId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
hbType
);
return
buf
;
}
typedef
struct
SClientHbReq
{
SClientHbKey
hbKey
;
SHashObj
*
info
;
// hash<Sklv>
}
SClientHbReq
;
static
FORCE_INLINE
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeSClientHbKey
(
buf
,
&
pReq
->
hbKey
);
void
*
pIter
=
NULL
;
void
*
data
;
SKlv
klv
;
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
while
(
data
!=
NULL
)
{
taosHashGetKey
(
data
,
&
klv
.
key
,
(
size_t
*
)
&
klv
.
keyLen
);
klv
.
valueLen
=
taosHashGetDataLen
(
data
);
klv
.
value
=
data
;
taosEncodeSKlv
(
buf
,
&
klv
);
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
)
{
ASSERT
(
pReq
->
info
!=
NULL
);
buf
=
taosDecodeSClientHbKey
(
buf
,
&
pReq
->
hbKey
);
//TODO: error handling
if
(
pReq
->
info
==
NULL
)
{
pReq
->
info
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
SKlv
klv
;
buf
=
taosDecodeSKlv
(
buf
,
&
klv
);
taosHashPut
(
pReq
->
info
,
klv
.
key
,
klv
.
keyLen
,
klv
.
value
,
klv
.
valueLen
);
return
buf
;
}
typedef
struct
SClientHbBatchReq
{
int64_t
reqId
;
SArray
*
reqs
;
// SArray<SClientHbReq>
}
SClientHbBatchReq
;
typedef
struct
SClientHbHandleResult
{
}
SClientHbHandleResult
;
typedef
struct
SClientHbRsp
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbRsp
;
typedef
struct
SClientHbBatchRsp
{
int64_t
reqId
;
int64_t
rspId
;
SArray
*
rsps
;
// SArray<SClientHbRsp>
}
SClientHbBatchRsp
;
typedef
int32_t
(
*
FHbRspHandle
)(
SClientHbReq
*
pReq
);
typedef
int32_t
(
*
FGetConnInfo
)(
int32_t
conn
,
void
*
self
);
typedef
int32_t
(
*
FHbRspHandle
)(
SClientHbRsp
*
pReq
);
typedef
int32_t
(
*
FGetConnInfo
)(
SClientHbKey
connKey
,
void
*
param
);
typedef
struct
SClientHbMgr
{
int8_t
inited
;
int32_t
reportInterval
;
// unit ms
int32_t
stats
;
SRWLatch
lock
;
SHashObj
*
info
;
//hash<SClientHbKey, SClientHbReq>
SHashObj
*
activeInfo
;
// hash<SClientHbKey, SClientHbReq>
SHashObj
*
getInfoFuncs
;
// hash<SClientHbKey, FGetConnInfo>
FHbRspHandle
handle
[
HEARTBEAT_TYPE_MAX
];
// input queue
}
SClientHbMgr
;
...
...
@@ -140,9 +43,11 @@ static SClientHbMgr clientHbMgr = {0};
int
hbMgrInit
();
void
hbMgrCleanUp
();
int
hbHandleRsp
(
void
*
hbMsg
);
int
hbRegisterConn
(
SClientHbKey
connKey
,
FGetConnInfo
func
);
int
registerConn
(
int32_t
connId
,
FGetConnInfo
func
,
FHbRspHandle
rspHandle
);
int
registerHbRspHandle
(
int32_t
connId
,
int32_t
hbType
,
FHbRspHandle
rspHandle
);
int
hbAddConnInfo
(
SClientHbKey
connKey
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
);
int
HbAddConnInfo
(
int32_t
connId
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
);
source/client/src/clientHb.c
浏览文件 @
19f161dd
...
...
@@ -15,45 +15,62 @@
#include "clientHb.h"
static
int32_t
mqHbRspHandle
(
SClientHbR
eq
*
pReq
)
{
static
int32_t
mqHbRspHandle
(
SClientHbR
sp
*
pReq
)
{
return
0
;
}
uint32_t
hbKeyHashFunc
(
const
char
*
key
,
uint32_t
keyLen
)
{
return
0
;
}
static
void
hbMgrInitMqHbFunc
()
{
clientHbMgr
.
handle
[
mq
]
=
mqHbRspHandle
;
}
int
hbMgrInit
()
{
//init once
//
//init lock
//
//init
handle funcs
clientHbMgr
.
handle
[
mq
]
=
mqHbRspHandle
;
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
//init
config
clientHbMgr
.
reportInterval
=
1500
;
//init stat
clientHbMgr
.
stats
=
0
;
//init config
clientHbMgr
.
reportInterval
=
1500
;
//init lock
taosInitRWLatch
(
&
clientHbMgr
.
lock
);
//init handle funcs
hbMgrInitMqHbFunc
();
//init hash info
//
clientHbMgr
.
activeInfo
=
taosHashInit
(
64
,
hbKeyHashFunc
,
1
,
HASH_ENTRY_LOCK
);
//init getInfoFunc
clientHbMgr
.
getInfoFuncs
=
taosHashInit
(
64
,
hbKeyHashFunc
,
1
,
HASH_ENTRY_LOCK
);
return
0
;
}
void
hbMgrCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
1
,
0
);
if
(
old
==
0
)
return
;
taosHashCleanup
(
clientHbMgr
.
activeInfo
);
taosHashCleanup
(
clientHbMgr
.
getInfoFuncs
);
}
int
registerConn
(
int32_t
connId
,
FGetConnInfo
func
,
FHbRspHandle
rspHandle
)
{
return
0
;
}
int
registerHbRspHandle
(
int32_t
connId
,
int32_t
hbType
,
FHbRspHandle
rspHandle
)
{
int
hbRegisterConn
(
SClientHbKey
connKey
,
FGetConnInfo
func
)
{
return
0
;
}
int
HbAddConnInfo
(
int32_t
connId
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
)
{
int
hbAddConnInfo
(
SClientHbKey
connKey
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
)
{
//lock
//find req by connection id
SClientHbReq
*
data
=
taosHashGet
(
clientHbMgr
.
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
ASSERT
(
data
!=
NULL
);
taosHashPut
(
data
->
info
,
key
,
keyLen
,
value
,
valueLen
);
//unlock
return
0
;
...
...
source/common/src/tmsg.c
浏览文件 @
19f161dd
...
...
@@ -27,6 +27,40 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeSClientHbKey
(
buf
,
&
pReq
->
connKey
);
void
*
pIter
=
NULL
;
void
*
data
;
SKlv
klv
;
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
while
(
data
!=
NULL
)
{
taosHashGetKey
(
data
,
&
klv
.
key
,
(
size_t
*
)
&
klv
.
keyLen
);
klv
.
valueLen
=
taosHashGetDataLen
(
data
);
klv
.
value
=
data
;
taosEncodeSKlv
(
buf
,
&
klv
);
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
}
return
tlen
;
}
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
)
{
ASSERT
(
pReq
->
info
!=
NULL
);
buf
=
taosDecodeSClientHbKey
(
buf
,
&
pReq
->
connKey
);
// TODO: error handling
if
(
pReq
->
info
==
NULL
)
{
pReq
->
info
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
SKlv
klv
;
buf
=
taosDecodeSKlv
(
buf
,
&
klv
);
taosHashPut
(
pReq
->
info
,
klv
.
key
,
klv
.
keyLen
,
klv
.
value
,
klv
.
valueLen
);
return
buf
;
}
int
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
)
{
int
tlen
=
0
;
...
...
@@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
}
return
buf
;
}
\ No newline at end of file
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录