Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
933d08b4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
933d08b4
编写于
6月 13, 2023
作者:
H
Haojun Liao
提交者:
GitHub
6月 13, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21690 from taosdata/feature/TS-3495
feat:add parameters for consumer & add offset rows for subscription
上级
5673d341
fb178816
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
864 addition
and
258 deletion
+864
-258
include/common/tmsg.h
include/common/tmsg.h
+44
-14
include/util/tdef.h
include/util/tdef.h
+1
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+65
-20
source/common/src/systable.c
source/common/src/systable.c
+7
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+53
-8
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+13
-5
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+54
-7
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+187
-93
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+110
-94
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-9
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+5
-5
tests/system-test/2-query/odbc.py
tests/system-test/2-query/odbc.py
+2
-2
tests/system-test/7-tmq/checkOffsetRowParams.py
tests/system-test/7-tmq/checkOffsetRowParams.py
+313
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
933d08b4
...
...
@@ -2033,6 +2033,12 @@ typedef struct {
char
cgroup
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
SArray
*
topicNames
;
// SArray<char**>
int8_t
withTbName
;
int8_t
useSnapshot
;
int8_t
autoCommit
;
int32_t
autoCommitInterval
;
int8_t
resetOffsetCfg
;
}
SCMSubscribeReq
;
static
FORCE_INLINE
int32_t
tSerializeSCMSubscribeReq
(
void
**
buf
,
const
SCMSubscribeReq
*
pReq
)
{
...
...
@@ -2047,6 +2053,13 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
for
(
int32_t
i
=
0
;
i
<
topicNum
;
i
++
)
{
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
taosArrayGetP
(
pReq
->
topicNames
,
i
));
}
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
useSnapshot
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
autoCommit
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
autoCommitInterval
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
resetOffsetCfg
);
return
tlen
;
}
...
...
@@ -2064,6 +2077,12 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf
=
taosDecodeString
(
buf
,
&
name
);
taosArrayPush
(
pReq
->
topicNames
,
&
name
);
}
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
useSnapshot
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
autoCommit
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
autoCommitInterval
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
resetOffsetCfg
);
return
buf
;
}
...
...
@@ -2455,15 +2474,6 @@ typedef struct {
char
cgroup
[
TSDB_CGROUP_LEN
];
}
SMqAskEpReq
;
typedef
struct
{
int64_t
consumerId
;
int32_t
epoch
;
}
SMqHbReq
;
typedef
struct
{
int8_t
reserved
;
}
SMqHbRsp
;
typedef
struct
{
int32_t
key
;
int32_t
valueLen
;
...
...
@@ -2891,7 +2901,7 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe
// tqOffset
enum
{
TMQ_OFFSET__RESET_NONE
=
-
3
,
TMQ_OFFSET__RESET_EARLIE
A
ST
=
-
2
,
TMQ_OFFSET__RESET_EARLIEST
=
-
2
,
TMQ_OFFSET__RESET_LATEST
=
-
1
,
TMQ_OFFSET__LOG
=
1
,
TMQ_OFFSET__SNAPSHOT_DATA
=
2
,
...
...
@@ -3354,6 +3364,28 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx
(
pRsp
->
topics
,
(
FDelete
)
tDeleteMqSubTopicEp
);
}
typedef
struct
{
int32_t
vgId
;
STqOffsetVal
offset
;
int64_t
rows
;
}
OffsetRows
;
typedef
struct
{
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
offsetRows
;
}
TopicOffsetRows
;
typedef
struct
{
int64_t
consumerId
;
int32_t
epoch
;
SArray
*
topics
;
}
SMqHbReq
;
typedef
struct
{
int8_t
reserved
;
}
SMqHbRsp
;
#define TD_AUTO_CREATE_TABLE 0x1
typedef
struct
{
int64_t
suid
;
...
...
@@ -3478,10 +3510,8 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t
tDeserializeSMqAskEpReq
(
void
*
buf
,
int32_t
bufLen
,
SMqAskEpReq
*
pReq
);
int32_t
tSerializeSMqHbReq
(
void
*
buf
,
int32_t
bufLen
,
SMqHbReq
*
pReq
);
int32_t
tDeserializeSMqHbReq
(
void
*
buf
,
int32_t
bufLen
,
SMqHbReq
*
pReq
);
int32_t
tSerializeSMqAskEpReq
(
void
*
buf
,
int32_t
bufLen
,
SMqAskEpReq
*
pReq
);
int32_t
tDeserializeSMqAskEpReq
(
void
*
buf
,
int32_t
bufLen
,
SMqAskEpReq
*
pReq
);
int32_t
tSerializeSMqHbReq
(
void
*
buf
,
int32_t
bufLen
,
SMqHbReq
*
pReq
);
int32_t
tDeserializeSMqHbReq
(
void
*
buf
,
int32_t
bufLen
,
SMqHbReq
*
pReq
);
int32_t
tDeatroySMqHbReq
(
SMqHbReq
*
pReq
);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
...
...
include/util/tdef.h
浏览文件 @
933d08b4
...
...
@@ -195,6 +195,7 @@ typedef enum ELogicConditionType {
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
...
...
source/client/src/clientTmq.c
浏览文件 @
933d08b4
...
...
@@ -82,7 +82,7 @@ struct tmq_t {
int8_t
useSnapshot
;
int8_t
autoCommit
;
int32_t
autoCommitInterval
;
int
32_t
resetOffsetCfg
;
int
8_t
resetOffsetCfg
;
uint64_t
consumerId
;
bool
hbBgEnable
;
tmq_commit_cb
*
commitCb
;
...
...
@@ -99,6 +99,7 @@ struct tmq_t {
// poll info
int64_t
pollCnt
;
int64_t
totalRows
;
bool
needReportOffsetRows
;
// timer
tmr_h
hbLiveTimer
;
...
...
@@ -264,7 +265,7 @@ tmq_conf_t* tmq_conf_new() {
conf
->
withTbName
=
false
;
conf
->
autoCommit
=
true
;
conf
->
autoCommitInterval
=
DEFAULT_AUTO_COMMIT_INTERVAL
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIE
A
ST
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIEST
;
conf
->
hbBgEnable
=
true
;
return
conf
;
...
...
@@ -318,7 +319,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
conf
->
resetOffset
=
TMQ_OFFSET__RESET_NONE
;
return
TMQ_CONF_OK
;
}
else
if
(
strcasecmp
(
value
,
"earliest"
)
==
0
)
{
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIE
A
ST
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIEST
;
return
TMQ_CONF_OK
;
}
else
if
(
strcasecmp
(
value
,
"latest"
)
==
0
)
{
conf
->
resetOffset
=
TMQ_OFFSET__RESET_LATEST
;
...
...
@@ -567,10 +568,10 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
atomic_add_fetch_32
(
&
pParamSet
->
totalRspNum
,
1
);
SEp
*
pEp
=
GET_ACTIVE_EP
(
&
pVg
->
epSet
);
char
offsetBuf
[
80
]
=
{
0
};
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pOffset
->
offset
.
val
);
char
commitBuf
[
80
]
=
{
0
};
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%"
PRIx64
,
tmq
->
consumerId
,
pOffset
->
offset
.
subKey
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
pEp
->
fqdn
,
pEp
->
port
,
index
+
1
,
...
...
@@ -796,6 +797,25 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq
req
=
{
0
};
req
.
consumerId
=
tmq
->
consumerId
;
req
.
epoch
=
tmq
->
epoch
;
if
(
tmq
->
needReportOffsetRows
){
req
.
topics
=
taosArrayInit
(
taosArrayGetSize
(
tmq
->
clientTopics
),
sizeof
(
TopicOffsetRows
));
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
){
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
int32_t
numOfVgroups
=
taosArrayGetSize
(
pTopic
->
vgs
);
TopicOffsetRows
*
data
=
taosArrayReserve
(
req
.
topics
,
1
);
strcpy
(
data
->
topicName
,
pTopic
->
topicName
);
data
->
offsetRows
=
taosArrayInit
(
numOfVgroups
,
sizeof
(
OffsetRows
));
for
(
int
j
=
0
;
j
<
numOfVgroups
;
j
++
){
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
OffsetRows
*
offRows
=
taosArrayReserve
(
data
->
offsetRows
,
1
);
offRows
->
vgId
=
pVg
->
vgId
;
offRows
->
rows
=
pVg
->
numOfRows
;
offRows
->
offset
=
pVg
->
offsetInfo
.
committedOffset
;
tscDebug
(
"report offset: %d"
,
offRows
->
offset
.
type
);
}
}
tmq
->
needReportOffsetRows
=
false
;
}
int32_t
tlen
=
tSerializeSMqHbReq
(
NULL
,
0
,
&
req
);
if
(
tlen
<
0
)
{
...
...
@@ -835,6 +855,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
OVER:
tDeatroySMqHbReq
(
&
req
);
taosTmrReset
(
tmqSendHbReq
,
1000
,
param
,
tmqMgmt
.
timer
,
&
tmq
->
hbLiveTimer
);
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
}
...
...
@@ -969,6 +990,14 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
int32_t
tmq_unsubscribe
(
tmq_t
*
tmq
)
{
if
(
tmq
->
autoCommit
)
{
int32_t
rsp
=
tmq_commit_sync
(
tmq
,
NULL
);
if
(
rsp
!=
0
)
{
return
rsp
;
}
}
taosSsleep
(
2
);
// sleep 2s for hb to send offset and rows to server
int32_t
rsp
;
int32_t
retryCnt
=
0
;
tmq_list_t
*
lst
=
tmq_list_new
();
...
...
@@ -1063,6 +1092,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq
->
status
=
TMQ_CONSUMER_STATUS__INIT
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
pTmq
->
needReportOffsetRows
=
true
;
// set conf
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
...
...
@@ -1107,7 +1137,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq
->
hbLiveTimer
=
taosTmrStart
(
tmqSendHbReq
,
1000
,
pRefId
,
tmqMgmt
.
timer
);
}
char
buf
[
80
]
=
{
0
};
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
STqOffsetVal
offset
=
{.
type
=
pTmq
->
resetOffsetCfg
};
tFormatOffset
(
buf
,
tListLen
(
buf
),
&
offset
);
tscInfo
(
"consumer:0x%"
PRIx64
" is setup, refId:%"
PRId64
...
...
@@ -1123,7 +1153,7 @@ _failed:
}
int32_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
)
{
const
int32_t
MAX_RETRY_COUNT
=
120
*
60
;
// let's wait for 2 mins at most
const
int32_t
MAX_RETRY_COUNT
=
120
*
2
;
// let's wait for 2 mins at most
const
SArray
*
container
=
&
topic_list
->
container
;
int32_t
sz
=
taosArrayGetSize
(
container
);
void
*
buf
=
NULL
;
...
...
@@ -1143,6 +1173,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto
FAIL
;
}
req
.
withTbName
=
tmq
->
withTbName
;
req
.
useSnapshot
=
tmq
->
useSnapshot
;
req
.
autoCommit
=
tmq
->
autoCommit
;
req
.
autoCommitInterval
=
tmq
->
autoCommitInterval
;
req
.
resetOffsetCfg
=
tmq
->
resetOffsetCfg
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
container
,
i
);
...
...
@@ -1375,8 +1411,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pRspWrapper
->
dataRsp
.
rspOffset
);
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pRspWrapper
->
dataRsp
.
rspOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" recv poll rsp, vgId:%d, req ver:%"
PRId64
", rsp:%s type %d, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
.
version
,
buf
,
rspType
,
requestId
);
}
else
if
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
...
...
@@ -1523,8 +1559,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SMqClientVg
*
pVgCur
=
taosArrayGet
(
pTopicCur
->
vgs
,
j
);
makeTopicVgroupKey
(
vgKey
,
pTopicCur
->
topicName
,
pVgCur
->
vgId
);
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pVgCur
->
offsetInfo
.
currentOffset
);
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVgCur
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
", epoch:%d vgId:%d vgKey:%s, offset:%s"
,
tmq
->
consumerId
,
epoch
,
pVgCur
->
vgId
,
vgKey
,
buf
);
...
...
@@ -1673,7 +1709,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
return
pRspObj
;
}
SMqTaosxRspObj
*
tmqBuildTaosxRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqTaosxRspObj
*
tmqBuildTaosxRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
,
SMqClientVg
*
pVg
,
int64_t
*
numOfRows
)
{
SMqTaosxRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqTaosxRspObj
));
pRspObj
->
resType
=
RES_TYPE__TMQ_METADATA
;
tstrncpy
(
pRspObj
->
topic
,
pWrapper
->
topicHandle
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
...
...
@@ -1688,6 +1724,13 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
}
// extract the rows in this data packet
for
(
int32_t
i
=
0
;
i
<
pRspObj
->
rsp
.
blockNum
;
++
i
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
pRspObj
->
rsp
.
blockData
,
i
);
int64_t
rows
=
htobe64
(
pRetrieve
->
numOfRows
);
pVg
->
numOfRows
+=
rows
;
(
*
numOfRows
)
+=
rows
;
}
return
pRspObj
;
}
...
...
@@ -1745,7 +1788,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo
->
msgType
=
TDMT_VND_TMQ_CONSUME
;
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
80
];
char
offsetFormatBuf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%"
PRIx64
,
pTmq
->
consumerId
,
...
...
@@ -1882,8 +1925,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg
->
offsetInfo
.
walVerEnd
=
pDataRsp
->
head
.
walever
;
pVg
->
receivedInfoFromVnode
=
true
;
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pDataRsp
->
rspOffset
);
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pDataRsp
->
rspOffset
);
if
(
pDataRsp
->
blockNum
==
0
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" empty block received, vgId:%d, offset:%s, vg total:%"
PRId64
" total:%"
PRId64
" reqId:0x%"
PRIx64
,
...
...
@@ -1985,13 +2028,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
else
{
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
);
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
tmq
->
totalRows
+=
numOfRows
;
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pVg
->
offsetInfo
.
currentOffset
);
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
" total:%"
PRId64
" reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
...
...
@@ -2110,6 +2153,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
return
rsp
;
}
}
taosSsleep
(
2
);
// sleep 2s for hb to send offset and rows to server
int32_t
retryCnt
=
0
;
tmq_list_t
*
lst
=
tmq_list_new
();
...
...
@@ -2411,6 +2455,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
// if no more waiting rsp
pParamSet
->
callbackFn
(
tmq
,
pParamSet
->
code
,
pParamSet
->
userParam
);
taosMemoryFree
(
pParamSet
);
tmq
->
needReportOffsetRows
=
true
;
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
return
0
;
...
...
@@ -2608,7 +2653,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo
->
msgType
=
TDMT_VND_TMQ_VG_WALINFO
;
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
80
];
char
offsetFormatBuf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pClientVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%"
PRIx64
,
...
...
@@ -2645,7 +2690,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pOffsetInfo
->
currentOffset
.
type
=
TMQ_OFFSET__LOG
;
char
offsetBuf
[
80
]
=
{
0
};
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pOffsetInfo
->
currentOffset
);
tscDebug
(
"vgId:%d offset is update to:%s"
,
p
->
vgId
,
offsetBuf
);
...
...
source/common/src/systable.c
浏览文件 @
933d08b4
...
...
@@ -291,6 +291,8 @@ static const SSysDbTableSchema subscriptionSchema[] = {
{.
name
=
"consumer_group"
,
.
bytes
=
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
,
.
sysInfo
=
false
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"consumer_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
sysInfo
=
false
},
{.
name
=
"offset"
,
.
bytes
=
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
,
.
sysInfo
=
false
},
{.
name
=
"rows"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
sysInfo
=
false
},
};
static
const
SSysDbTableSchema
vnodesSchema
[]
=
{
...
...
@@ -359,6 +361,11 @@ static const SSysDbTableSchema consumerSchema[] = {
{.
name
=
"up_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"subscribe_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"rebalance_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"msg.with.table.name"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"experimental.snapshot.enable"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"enable.auto.commit"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"auto.commit.interval.ms"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"auto.offset.reset"
,
.
bytes
=
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
,
.
sysInfo
=
false
},
};
static
const
SSysDbTableSchema
offsetSchema
[]
=
{
...
...
source/common/src/tmsg.c
浏览文件 @
933d08b4
...
...
@@ -5338,6 +5338,15 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
return
0
;
}
int32_t
tDeatroySMqHbReq
(
SMqHbReq
*
pReq
){
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pReq
->
topics
);
i
++
){
TopicOffsetRows
*
vgs
=
taosArrayGet
(
pReq
->
topics
,
i
);
if
(
vgs
)
taosArrayDestroy
(
vgs
->
offsetRows
);
}
taosArrayDestroy
(
pReq
->
topics
);
return
0
;
}
int32_t
tSerializeSMqHbReq
(
void
*
buf
,
int32_t
bufLen
,
SMqHbReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
@@ -5346,6 +5355,21 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if
(
tEncodeI64
(
&
encoder
,
pReq
->
consumerId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
epoch
)
<
0
)
return
-
1
;
int32_t
sz
=
taosArrayGetSize
(
pReq
->
topics
);
if
(
tEncodeI32
(
&
encoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
TopicOffsetRows
*
vgs
=
(
TopicOffsetRows
*
)
taosArrayGet
(
pReq
->
topics
,
i
);
if
(
tEncodeCStr
(
&
encoder
,
vgs
->
topicName
)
<
0
)
return
-
1
;
int32_t
szVgs
=
taosArrayGetSize
(
vgs
->
offsetRows
);
if
(
tEncodeI32
(
&
encoder
,
szVgs
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayGet
(
vgs
->
offsetRows
,
j
);
if
(
tEncodeI32
(
&
encoder
,
offRows
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
offRows
->
rows
)
<
0
)
return
-
1
;
if
(
tEncodeSTqOffsetVal
(
&
encoder
,
&
offRows
->
offset
)
<
0
)
return
-
1
;
}
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -5362,7 +5386,28 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
consumerId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
epoch
)
<
0
)
return
-
1
;
int32_t
sz
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
>
0
){
pReq
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
TopicOffsetRows
));
if
(
NULL
==
pReq
->
topics
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
TopicOffsetRows
*
data
=
taosArrayReserve
(
pReq
->
topics
,
1
);
tDecodeCStrTo
(
&
decoder
,
data
->
topicName
);
int32_t
szVgs
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
szVgs
)
<
0
)
return
-
1
;
if
(
szVgs
>
0
){
data
->
offsetRows
=
taosArrayInit
(
szVgs
,
sizeof
(
OffsetRows
));
if
(
NULL
==
data
->
offsetRows
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayReserve
(
data
->
offsetRows
,
1
);
if
(
tDecodeI32
(
&
decoder
,
&
offRows
->
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
offRows
->
rows
)
<
0
)
return
-
1
;
if
(
tDecodeSTqOffsetVal
(
&
decoder
,
&
offRows
->
offset
)
<
0
)
return
-
1
;
}
}
}
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
@@ -7086,15 +7131,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
int32_t
tFormatOffset
(
char
*
buf
,
int32_t
maxLen
,
const
STqOffsetVal
*
pVal
)
{
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_NONE
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to none)
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to earlieast)
"
);
snprintf
(
buf
,
maxLen
,
"
none
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
snprintf
(
buf
,
maxLen
,
"
earliest
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_LATEST
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to latest)
"
);
snprintf
(
buf
,
maxLen
,
"
latest
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__LOG
)
{
snprintf
(
buf
,
maxLen
,
"
offset(log) ver
:%"
PRId64
,
pVal
->
version
);
snprintf
(
buf
,
maxLen
,
"
log
:%"
PRId64
,
pVal
->
version
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
snprintf
(
buf
,
maxLen
,
"
offset(snapshot) uid:%"
PRId64
" ts:
%"
PRId64
,
pVal
->
uid
,
pVal
->
ts
);
snprintf
(
buf
,
maxLen
,
"
snapshot:%"
PRId64
"|
%"
PRId64
,
pVal
->
uid
,
pVal
->
ts
);
}
else
{
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -7112,7 +7157,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return
pLeft
->
uid
==
pRight
->
uid
;
}
else
{
ASSERT
(
0
);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIE
A
ST ||*/
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
/*return true;*/
}
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
933d08b4
...
...
@@ -163,7 +163,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_SUBSCRIBE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_ASK_EP
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_HB
,
mmPutMsgTo
Read
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_HB
,
mmPutMsgTo
Write
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_CGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_CGROUP_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_TRANS
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
933d08b4
...
...
@@ -551,12 +551,18 @@ typedef struct {
int64_t
upTime
;
int64_t
subscribeTime
;
int64_t
rebalanceTime
;
int8_t
withTbName
;
int8_t
useSnapshot
;
int8_t
autoCommit
;
int32_t
autoCommitInterval
;
int32_t
resetOffsetCfg
;
}
SMqConsumerObj
;
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
]);
void
tDeleteSMqConsumerObj
(
SMqConsumerObj
*
pConsumer
);
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
);
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
);
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
,
int8_t
sver
);
typedef
struct
{
int32_t
vgId
;
...
...
@@ -572,12 +578,13 @@ void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
typedef
struct
{
int64_t
consumerId
;
// -1 for unassigned
SArray
*
vgs
;
// SArray<SMqVgEp*>
SArray
*
offsetRows
;
// SArray<OffsetRows*>
}
SMqConsumerEp
;
SMqConsumerEp
*
tCloneSMqConsumerEp
(
const
SMqConsumerEp
*
pEp
);
void
tDeleteSMqConsumerEp
(
void
*
pEp
);
//
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
//
void tDeleteSMqConsumerEp(void* pEp);
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
const
SMqConsumerEp
*
pEp
);
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pEp
);
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pEp
,
int8_t
sver
);
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
...
...
@@ -589,6 +596,7 @@ typedef struct {
int64_t
stbUid
;
SHashObj
*
consumerHash
;
// consumerId -> SMqConsumerEp
SArray
*
unassignedVgs
;
// SArray<SMqVgEp*>
SArray
*
offsetRows
;
char
dbName
[
TSDB_DB_FNAME_LEN
];
}
SMqSubscribeObj
;
...
...
@@ -596,7 +604,7 @@ SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
SMqSubscribeObj
*
tCloneSubscribeObj
(
const
SMqSubscribeObj
*
pSub
);
void
tDeleteSubscribeObj
(
SMqSubscribeObj
*
pSub
);
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
);
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
);
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
,
int8_t
sver
);
typedef
struct
{
int32_t
epoch
;
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
933d08b4
...
...
@@ -23,7 +23,7 @@
#include "tcompare.h"
#include "tname.h"
#define MND_CONSUMER_VER_NUMBER
1
#define MND_CONSUMER_VER_NUMBER
2
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 6
...
...
@@ -391,12 +391,13 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
static
int32_t
mndProcessMqHbReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SMqHbReq
req
=
{
0
};
if
(
tDeserializeSMqHbReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
req
)
<
0
)
{
if
(
(
code
=
tDeserializeSMqHbReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
req
)
)
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
goto
end
;
}
int64_t
consumerId
=
req
.
consumerId
;
...
...
@@ -404,7 +405,8 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
if
(
pConsumer
==
NULL
)
{
mError
(
"consumer:0x%"
PRIx64
" not exist"
,
consumerId
);
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
code
=
-
1
;
goto
end
;
}
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
...
...
@@ -424,9 +426,28 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
tmsgPutToQueue
(
&
pMnode
->
msgCb
,
WRITE_QUEUE
,
&
pRpcMsg
);
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
req
.
topics
);
i
++
){
TopicOffsetRows
*
data
=
taosArrayGet
(
req
.
topics
,
i
);
mDebug
(
"heartbeat report offset rows.%s:%s"
,
pConsumer
->
cgroup
,
data
->
topicName
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
data
->
topicName
);
taosWLockLatch
(
&
pSub
->
lock
);
SMqConsumerEp
*
pConsumerEp
=
taosHashGet
(
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
if
(
pConsumerEp
){
taosArrayDestroy
(
pConsumerEp
->
offsetRows
);
pConsumerEp
->
offsetRows
=
data
->
offsetRows
;
data
->
offsetRows
=
NULL
;
}
taosWUnLockLatch
(
&
pSub
->
lock
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
0
;
end:
tDeatroySMqHbReq
(
&
req
);
return
code
;
}
static
int32_t
mndProcessAskEpReq
(
SRpcMsg
*
pMsg
)
{
...
...
@@ -675,6 +696,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew
=
tNewSMqConsumerObj
(
consumerId
,
cgroup
);
tstrncpy
(
pConsumerNew
->
clientId
,
subscribe
.
clientId
,
tListLen
(
pConsumerNew
->
clientId
));
pConsumerNew
->
withTbName
=
subscribe
.
withTbName
;
pConsumerNew
->
useSnapshot
=
subscribe
.
useSnapshot
;
pConsumerNew
->
autoCommit
=
subscribe
.
autoCommit
;
pConsumerNew
->
autoCommitInterval
=
subscribe
.
autoCommitInterval
;
pConsumerNew
->
resetOffsetCfg
=
subscribe
.
resetOffsetCfg
;
// set the update type
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__REBALANCE
;
taosArrayDestroy
(
pConsumerNew
->
assignedTopics
);
...
...
@@ -822,7 +849,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto
CM_DECODE_OVER
;
}
if
(
sver
!=
MND_CONSUMER_VER_NUMBER
)
{
if
(
sver
<
1
||
sver
>
MND_CONSUMER_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
CM_DECODE_OVER
;
}
...
...
@@ -849,7 +876,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
CM_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CM_DECODE_OVER
);
if
(
tDecodeSMqConsumerObj
(
buf
,
pConsumer
)
==
NULL
)
{
if
(
tDecodeSMqConsumerObj
(
buf
,
pConsumer
,
sver
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
// TODO set correct error code
goto
CM_DECODE_OVER
;
}
...
...
@@ -1159,6 +1186,26 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
rebalanceTime
,
pConsumer
->
rebalanceTime
==
0
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
withTbName
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
useSnapshot
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
autoCommit
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
autoCommitInterval
,
false
);
char
buf
[
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STqOffsetVal
pVal
=
{.
type
=
pConsumer
->
resetOffsetCfg
};
tFormatOffset
(
varDataVal
(
buf
),
TSDB_OFFSET_LEN
,
&
pVal
);
varDataSetLen
(
buf
,
strlen
(
varDataVal
(
buf
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
buf
,
false
);
numOfRows
++
;
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
933d08b4
...
...
@@ -321,10 +321,15 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen
+=
taosEncodeFixedI32
(
buf
,
0
);
}
tlen
+=
taosEncodeFixedI8
(
buf
,
pConsumer
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pConsumer
->
useSnapshot
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pConsumer
->
autoCommit
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
autoCommitInterval
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
resetOffsetCfg
);
return
tlen
;
}
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
,
int8_t
sver
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
clientId
);
...
...
@@ -375,50 +380,96 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
taosArrayPush
(
pConsumer
->
assignedTopics
,
&
topic
);
}
return
(
void
*
)
buf
;
}
if
(
sver
>
1
){
buf
=
taosDecodeFixedI8
(
buf
,
&
pConsumer
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pConsumer
->
useSnapshot
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pConsumer
->
autoCommit
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
autoCommitInterval
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
resetOffsetCfg
);
}
SMqConsumerEp
*
tCloneSMqConsumerEp
(
const
SMqConsumerEp
*
pConsumerEpOld
)
{
SMqConsumerEp
*
pConsumerEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEp
));
if
(
pConsumerEpNew
==
NULL
)
return
NULL
;
pConsumerEpNew
->
consumerId
=
pConsumerEpOld
->
consumerId
;
pConsumerEpNew
->
vgs
=
taosArrayDup
(
pConsumerEpOld
->
vgs
,
(
__array_item_dup_fn_t
)
tCloneSMqVgEp
);
return
pConsumerEpNew
;
return
(
void
*
)
buf
;
}
void
tDeleteSMqConsumerEp
(
void
*
data
)
{
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
data
;
taosArrayDestroyP
(
pConsumerEp
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
}
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
// return pConsumerEpNew;
//}
//
//void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
//}
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
const
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeArray
(
buf
,
pConsumerEp
->
vgs
,
(
FEncode
)
tEncodeSMqVgEp
);
#if 0
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
tlen += tEncodeSMqVgEp(buf, pVgEp);
int32_t
szVgs
=
taosArrayGetSize
(
pConsumerEp
->
offsetRows
);
tlen
+=
taosEncodeFixedI32
(
buf
,
szVgs
);
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayGet
(
pConsumerEp
->
offsetRows
,
j
);
tlen
+=
taosEncodeFixedI32
(
buf
,
offRows
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
rows
);
tlen
+=
taosEncodeFixedI8
(
buf
,
offRows
->
offset
.
type
);
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
uid
);
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
ts
);
}
else
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__LOG
)
{
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
version
);
}
else
{
// do nothing
}
}
#endif
//#if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp);
// }
//#endif
return
tlen
;
}
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pConsumerEp
,
int8_t
sver
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeArray
(
buf
,
&
pConsumerEp
->
vgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMqVgEp
));
#if 0
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
buf = tDecodeSMqVgEp(buf, pVgEp);
taosArrayPush(pConsumerEp->vgs, &pVgEp);
if
(
sver
>
1
){
int32_t
szVgs
=
0
;
buf
=
taosDecodeFixedI32
(
buf
,
&
szVgs
);
if
(
szVgs
>
0
){
pConsumerEp
->
offsetRows
=
taosArrayInit
(
szVgs
,
sizeof
(
OffsetRows
));
if
(
NULL
==
pConsumerEp
->
offsetRows
)
return
NULL
;
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayReserve
(
pConsumerEp
->
offsetRows
,
1
);
buf
=
taosDecodeFixedI32
(
buf
,
&
offRows
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
rows
);
buf
=
taosDecodeFixedI8
(
buf
,
&
offRows
->
offset
.
type
);
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
uid
);
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
ts
);
}
else
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__LOG
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
version
);
}
else
{
// do nothing
}
}
}
}
#endif
//#if 0
// int32_t sz;
// buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
// buf = tDecodeSMqVgEp(buf, pVgEp);
// taosArrayPush(pConsumerEp->vgs, &pVgEp);
// }
//#endif
return
(
void
*
)
buf
;
}
...
...
@@ -468,6 +519,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosHashPut
(
pSubNew
->
consumerHash
,
&
newEp
.
consumerId
,
sizeof
(
int64_t
),
&
newEp
,
sizeof
(
SMqConsumerEp
));
}
pSubNew
->
unassignedVgs
=
taosArrayDup
(
pSub
->
unassignedVgs
,
(
__array_item_dup_fn_t
)
tCloneSMqVgEp
);
pSubNew
->
offsetRows
=
taosArrayDup
(
pSub
->
offsetRows
,
NULL
);
memcpy
(
pSubNew
->
dbName
,
pSub
->
dbName
,
TSDB_DB_FNAME_LEN
);
return
pSubNew
;
}
...
...
@@ -479,9 +531,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
taosArrayDestroyP
(
pConsumerEp
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
taosArrayDestroy
(
pConsumerEp
->
offsetRows
);
}
taosHashCleanup
(
pSub
->
consumerHash
);
taosArrayDestroyP
(
pSub
->
unassignedVgs
,
(
FDelete
)
tDeleteSMqVgEp
);
taosArrayDestroy
(
pSub
->
offsetRows
);
}
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
...
...
@@ -508,10 +562,27 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
if
(
cnt
!=
sz
)
return
-
1
;
tlen
+=
taosEncodeArray
(
buf
,
pSub
->
unassignedVgs
,
(
FEncode
)
tEncodeSMqVgEp
);
tlen
+=
taosEncodeString
(
buf
,
pSub
->
dbName
);
int32_t
szVgs
=
taosArrayGetSize
(
pSub
->
offsetRows
);
tlen
+=
taosEncodeFixedI32
(
buf
,
szVgs
);
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayGet
(
pSub
->
offsetRows
,
j
);
tlen
+=
taosEncodeFixedI32
(
buf
,
offRows
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
rows
);
tlen
+=
taosEncodeFixedI8
(
buf
,
offRows
->
offset
.
type
);
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
uid
);
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
ts
);
}
else
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__LOG
)
{
tlen
+=
taosEncodeFixedI64
(
buf
,
offRows
->
offset
.
version
);
}
else
{
// do nothing
}
}
return
tlen
;
}
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
)
{
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
,
int8_t
sver
)
{
//
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pSub
->
dbUid
);
...
...
@@ -526,74 +597,97 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
pSub
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
consumerEp
=
{
0
};
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
consumerEp
);
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
consumerEp
,
sver
);
taosHashPut
(
pSub
->
consumerHash
,
&
consumerEp
.
consumerId
,
sizeof
(
int64_t
),
&
consumerEp
,
sizeof
(
SMqConsumerEp
));
}
buf
=
taosDecodeArray
(
buf
,
&
pSub
->
unassignedVgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMqVgEp
));
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
dbName
);
return
(
void
*
)
buf
;
}
SMqSubActionLogEntry
*
tCloneSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
)
{
SMqSubActionLogEntry
*
pEntryNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubActionLogEntry
));
if
(
pEntryNew
==
NULL
)
return
NULL
;
pEntryNew
->
epoch
=
pEntry
->
epoch
;
pEntryNew
->
consumers
=
taosArrayDup
(
pEntry
->
consumers
,
(
__array_item_dup_fn_t
)
tCloneSMqConsumerEp
);
return
pEntryNew
;
}
void
tDeleteSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
)
{
taosArrayDestroyEx
(
pEntry
->
consumers
,
(
FDelete
)
tDeleteSMqConsumerEp
);
}
int32_t
tEncodeSMqSubActionLogEntry
(
void
**
buf
,
const
SMqSubActionLogEntry
*
pEntry
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pEntry
->
epoch
);
tlen
+=
taosEncodeArray
(
buf
,
pEntry
->
consumers
,
(
FEncode
)
tEncodeSMqSubActionLogEntry
);
return
tlen
;
}
void
*
tDecodeSMqSubActionLogEntry
(
const
void
*
buf
,
SMqSubActionLogEntry
*
pEntry
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pEntry
->
epoch
);
buf
=
taosDecodeArray
(
buf
,
&
pEntry
->
consumers
,
(
FDecode
)
tDecodeSMqSubActionLogEntry
,
sizeof
(
SMqSubActionLogEntry
));
return
(
void
*
)
buf
;
}
SMqSubActionLogObj
*
tCloneSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
)
{
SMqSubActionLogObj
*
pLogNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubActionLogObj
));
if
(
pLogNew
==
NULL
)
return
pLogNew
;
memcpy
(
pLogNew
->
key
,
pLog
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
pLogNew
->
logs
=
taosArrayDup
(
pLog
->
logs
,
(
__array_item_dup_fn_t
)
tCloneSMqConsumerEp
);
return
pLogNew
;
}
void
tDeleteSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
)
{
taosArrayDestroyEx
(
pLog
->
logs
,
(
FDelete
)
tDeleteSMqConsumerEp
);
}
int32_t
tEncodeSMqSubActionLogObj
(
void
**
buf
,
const
SMqSubActionLogObj
*
pLog
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pLog
->
key
);
tlen
+=
taosEncodeArray
(
buf
,
pLog
->
logs
,
(
FEncode
)
tEncodeSMqSubActionLogEntry
);
return
tlen
;
}
void
*
tDecodeSMqSubActionLogObj
(
const
void
*
buf
,
SMqSubActionLogObj
*
pLog
)
{
buf
=
taosDecodeStringTo
(
buf
,
pLog
->
key
);
buf
=
taosDecodeArray
(
buf
,
&
pLog
->
logs
,
(
FDecode
)
tDecodeSMqSubActionLogEntry
,
sizeof
(
SMqSubActionLogEntry
));
if
(
sver
>
1
){
int32_t
szVgs
=
0
;
buf
=
taosDecodeFixedI32
(
buf
,
&
szVgs
);
if
(
szVgs
>
0
){
pSub
->
offsetRows
=
taosArrayInit
(
szVgs
,
sizeof
(
OffsetRows
));
if
(
NULL
==
pSub
->
offsetRows
)
return
NULL
;
for
(
int32_t
j
=
0
;
j
<
szVgs
;
++
j
)
{
OffsetRows
*
offRows
=
taosArrayReserve
(
pSub
->
offsetRows
,
1
);
buf
=
taosDecodeFixedI32
(
buf
,
&
offRows
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
rows
);
buf
=
taosDecodeFixedI8
(
buf
,
&
offRows
->
offset
.
type
);
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
offRows
->
offset
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
uid
);
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
ts
);
}
else
if
(
offRows
->
offset
.
type
==
TMQ_OFFSET__LOG
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
offRows
->
offset
.
version
);
}
else
{
// do nothing
}
}
}
}
return
(
void
*
)
buf
;
}
int32_t
tEncodeSMqOffsetObj
(
void
**
buf
,
const
SMqOffsetObj
*
pOffset
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pOffset
->
key
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pOffset
->
offset
);
return
tlen
;
}
void
*
tDecodeSMqOffsetObj
(
void
*
buf
,
SMqOffsetObj
*
pOffset
)
{
buf
=
taosDecodeStringTo
(
buf
,
pOffset
->
key
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pOffset
->
offset
);
return
buf
;
}
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL;
// pEntryNew->epoch = pEntry->epoch;
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pEntryNew;
//}
//
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
//}
//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
// int32_t tlen = 0;
// tlen += taosEncodeFixedI32(buf, pEntry->epoch);
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
//}
//
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
// buf = taosDecodeFixedI32(buf, &pEntry->epoch);
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
//}
//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
// if (pLogNew == NULL) return pLogNew;
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pLogNew;
//}
//
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
//}
//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pLog->key);
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
//}
//
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
// buf = taosDecodeStringTo(buf, pLog->key);
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
//}
//
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pOffset->key);
// tlen += taosEncodeFixedI64(buf, pOffset->offset);
// return tlen;
//}
//
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
// buf = taosDecodeStringTo(buf, pOffset->key);
// buf = taosDecodeFixedI64(buf, &pOffset->offset);
// return buf;
//}
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
933d08b4
...
...
@@ -24,7 +24,7 @@
#include "tcompare.h"
#include "tname.h"
#define MND_SUBSCRIBE_VER_NUMBER
1
#define MND_SUBSCRIBE_VER_NUMBER
2
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_CNT 3
...
...
@@ -255,7 +255,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
for
(
int32_t
i
=
0
;
i
<
numOfNewConsumers
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pInput
->
pRebInfo
->
newConsumers
,
i
);
SMqConsumerEp
newConsumerEp
;
SMqConsumerEp
newConsumerEp
=
{
0
}
;
newConsumerEp
.
consumerId
=
consumerId
;
newConsumerEp
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -449,8 +449,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqRebOutputVg
*
pRebOutput
=
(
SMqRebOutputVg
*
)
pRemovedIter
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebOutput
);
if
(
taosHashGetSize
(
pOutput
->
pSub
->
consumerHash
)
==
0
){
// if all consumer is removed, put all vg into unassigned
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
if
(
taosHashGetSize
(
pOutput
->
pSub
->
consumerHash
)
==
0
){
// if all consumer is removed
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
// put all vg into unassigned
}
}
if
(
taosHashGetSize
(
pOutput
->
pSub
->
consumerHash
)
==
0
)
{
// if all consumer is removed
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribeByKey
(
pMnode
,
pInput
->
pRebInfo
->
key
);
// put all offset rows
if
(
pSub
)
{
taosRLockLatch
(
&
pSub
->
lock
);
bool
init
=
false
;
if
(
pOutput
->
pSub
->
offsetRows
==
NULL
)
{
pOutput
->
pSub
->
offsetRows
=
taosArrayInit
(
4
,
sizeof
(
OffsetRows
));
init
=
true
;
}
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
if
(
init
)
{
taosArrayAddAll
(
pOutput
->
pSub
->
offsetRows
,
pConsumerEp
->
offsetRows
);
// mDebug("pSub->offsetRows is init");
}
else
{
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pConsumerEp
->
offsetRows
);
j
++
)
{
OffsetRows
*
d1
=
taosArrayGet
(
pConsumerEp
->
offsetRows
,
j
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
pSub
->
offsetRows
);
i
++
)
{
OffsetRows
*
d2
=
taosArrayGet
(
pOutput
->
pSub
->
offsetRows
,
i
);
if
(
d1
->
vgId
==
d2
->
vgId
)
{
d2
->
rows
+=
d1
->
rows
;
d2
->
offset
=
d1
->
offset
;
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
}
}
}
}
}
taosRUnLockLatch
(
&
pSub
->
lock
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
}
...
...
@@ -809,7 +845,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
SUB_DECODE_OVER
;
if
(
sver
!=
MND_SUBSCRIBE_VER_NUMBER
)
{
if
(
sver
>
MND_SUBSCRIBE_VER_NUMBER
||
sver
<
1
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
SUB_DECODE_OVER
;
}
...
...
@@ -828,7 +864,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
SUB_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_DECODE_OVER
);
if
(
tDecodeSubscribeObj
(
buf
,
pSub
)
==
NULL
)
{
if
(
tDecodeSubscribeObj
(
buf
,
pSub
,
sver
)
==
NULL
)
{
goto
SUB_DECODE_OVER
;
}
...
...
@@ -890,6 +926,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
pOldSub
->
unassignedVgs
=
pNewSub
->
unassignedVgs
;
pNewSub
->
unassignedVgs
=
tmp1
;
SArray
*
tmp2
=
pOldSub
->
offsetRows
;
pOldSub
->
offsetRows
=
pNewSub
->
offsetRows
;
pNewSub
->
offsetRows
=
tmp2
;
taosWUnLockLatch
(
&
pOldSub
->
lock
);
return
0
;
}
...
...
@@ -1028,6 +1068,61 @@ END:
return
code
;
}
static
int32_t
buildResult
(
SSDataBlock
*
pBlock
,
int32_t
*
numOfRows
,
int64_t
consumerId
,
const
char
*
topic
,
const
char
*
cgroup
,
SArray
*
vgs
,
SArray
*
offsetRows
){
int32_t
sz
=
taosArrayGetSize
(
vgs
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
vgs
,
j
);
SColumnInfoData
*
pColInfo
;
int32_t
cols
=
0
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
topic
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
cgroup
,
false
);
// vg id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
&
pVgEp
->
vgId
,
false
);
// consumer id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
&
consumerId
,
consumerId
==
-
1
);
mDebug
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
consumerId
,
varDataVal
(
cgroup
),
pVgEp
->
vgId
);
// offset
OffsetRows
*
data
=
NULL
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
offsetRows
);
i
++
){
OffsetRows
*
tmp
=
taosArrayGet
(
offsetRows
,
i
);
if
(
tmp
->
vgId
!=
pVgEp
->
vgId
){
continue
;
}
data
=
tmp
;
}
if
(
data
){
// vg id
char
buf
[
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tFormatOffset
(
varDataVal
(
buf
),
TSDB_OFFSET_LEN
,
&
data
->
offset
);
varDataSetLen
(
buf
,
strlen
(
varDataVal
(
buf
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
buf
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
&
data
->
rows
,
false
);
}
else
{
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetNULL
(
pColInfo
,
*
numOfRows
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetNULL
(
pColInfo
,
*
numOfRows
);
mError
(
"mnd show subscriptions: do not find vgId:%d in offsetRows"
,
pVgEp
->
vgId
);
}
(
*
numOfRows
)
++
;
}
return
0
;
}
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
@@ -1048,6 +1143,13 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
blockDataEnsureCapacity
(
pBlock
,
numOfRows
+
pSub
->
vgNum
);
}
// topic and cgroup
char
topic
[
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndSplitSubscribeKey
(
pSub
->
key
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
false
);
varDataSetLen
(
topic
,
strlen
(
varDataVal
(
topic
)));
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
SMqConsumerEp
*
pConsumerEp
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
@@ -1055,97 +1157,11 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
if
(
pIter
==
NULL
)
break
;
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
int32_t
sz
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
pConsumerEp
->
vgs
,
j
);
SColumnInfoData
*
pColInfo
;
int32_t
cols
=
0
;
// topic and cgroup
char
topic
[
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndSplitSubscribeKey
(
pSub
->
key
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
false
);
varDataSetLen
(
topic
,
strlen
(
varDataVal
(
topic
)));
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
topic
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
cgroup
,
false
);
// vg id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgEp
->
vgId
,
false
);
// consumer id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumerEp
->
consumerId
,
false
);
mDebug
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
pConsumerEp
->
consumerId
,
varDataVal
(
cgroup
),
pVgEp
->
vgId
);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows
++
;
}
buildResult
(
pBlock
,
&
numOfRows
,
pConsumerEp
->
consumerId
,
topic
,
cgroup
,
pConsumerEp
->
vgs
,
pConsumerEp
->
offsetRows
);
}
// do not show for cleared subscription
int32_t
sz
=
taosArrayGetSize
(
pSub
->
unassignedVgs
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
pSub
->
unassignedVgs
,
i
);
SColumnInfoData
*
pColInfo
;
int32_t
cols
=
0
;
// topic and cgroup
char
topic
[
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndSplitSubscribeKey
(
pSub
->
key
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
false
);
varDataSetLen
(
topic
,
strlen
(
varDataVal
(
topic
)));
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
topic
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
cgroup
,
false
);
// vg id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgEp
->
vgId
,
false
);
// consumer id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
NULL
,
true
);
mDebug
(
"mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d"
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
pVgEp
->
vgId
);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows
++
;
}
buildResult
(
pBlock
,
&
numOfRows
,
-
1
,
topic
,
cgroup
,
pSub
->
unassignedVgs
,
pSub
->
offsetRows
);
pBlock
->
info
.
rows
=
numOfRows
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
933d08b4
...
...
@@ -243,8 +243,8 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
tqDoSendDataRsp
(
&
pHandle
->
msg
->
info
,
&
dataRsp
,
pHandle
->
epoch
,
pHandle
->
consumerId
,
TMQ_MSG_TYPE__POLL_RSP
,
sver
,
ever
);
char
buf1
[
80
]
=
{
0
};
char
buf2
[
80
]
=
{
0
};
char
buf1
[
TSDB_OFFSET_LEN
]
=
{
0
};
char
buf2
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf1
,
tListLen
(
buf1
),
&
dataRsp
.
reqOffset
);
tFormatOffset
(
buf2
,
tListLen
(
buf2
),
&
dataRsp
.
rspOffset
);
tqDebug
(
"vgId:%d, from consumer:0x%"
PRIx64
" (epoch %d) push rsp, block num: %d, req:%s, rsp:%s"
,
vgId
,
...
...
@@ -259,10 +259,10 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
tqDoSendDataRsp
(
&
pMsg
->
info
,
pRsp
,
pReq
->
epoch
,
pReq
->
consumerId
,
type
,
sver
,
ever
);
char
buf1
[
80
]
=
{
0
};
char
buf2
[
80
]
=
{
0
};
tFormatOffset
(
buf1
,
80
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
80
,
&
pRsp
->
rspOffset
);
char
buf1
[
TSDB_OFFSET_LEN
]
=
{
0
};
char
buf2
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf1
,
TSDB_OFFSET_LEN
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
TSDB_OFFSET_LEN
,
&
pRsp
->
rspOffset
);
tqDebug
(
"vgId:%d consumer:0x%"
PRIx64
" (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"
PRIx64
,
vgId
,
pReq
->
consumerId
,
pReq
->
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
,
pReq
->
reqId
);
...
...
@@ -481,8 +481,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pHandle
->
epoch
=
reqEpoch
;
}
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
reqOffset
);
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
reqOffset
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%"
PRIx64
,
consumerId
,
req
.
epoch
,
pHandle
->
subKey
,
vgId
,
buf
,
req
.
reqId
);
...
...
@@ -559,7 +559,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
}
else
{
dataRsp
.
rspOffset
.
version
=
currentVer
;
// return current consume offset value
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
dataRsp
.
rspOffset
.
version
=
sver
;
// not consume yet, set the earliest position
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
dataRsp
.
rspOffset
.
version
=
ever
;
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
933d08b4
...
...
@@ -99,15 +99,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if
(
pOffset
!=
NULL
)
{
*
pOffsetVal
=
pOffset
->
val
;
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
pOffsetVal
);
char
formatBuf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
formatBuf
,
TSDB_OFFSET_LEN
,
pOffsetVal
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"
PRIx64
,
consumerId
,
pHandle
->
subKey
,
vgId
,
formatBuf
,
pRequest
->
reqId
);
return
0
;
}
else
{
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
useSnapshot
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey:%s, vgId:%d, (earliest) set offset to be snapshot"
,
consumerId
,
pHandle
->
subKey
,
vgId
);
...
...
@@ -186,8 +186,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
dataRsp
,
TMQ_MSG_TYPE__POLL_RSP
,
vgId
);
end
:
{
char
buf
[
80
]
=
{
0
};
tFormatOffset
(
buf
,
80
,
&
dataRsp
.
rspOffset
);
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
rspOffset
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%"
PRIx64
" code:%d"
,
consumerId
,
pHandle
->
subKey
,
vgId
,
dataRsp
.
blockNum
,
buf
,
pRequest
->
reqId
,
code
);
...
...
tests/system-test/2-query/odbc.py
浏览文件 @
933d08b4
...
...
@@ -22,8 +22,8 @@ class TDTestCase:
tdSql
.
execute
(
"insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)"
)
tdSql
.
query
(
"select count(*) from information_schema.ins_columns"
)
# enterprise version: 2
88, community version: 280
tdSql
.
checkData
(
0
,
0
,
2
88
)
# enterprise version: 2
95, community version: 285
tdSql
.
checkData
(
0
,
0
,
2
95
)
tdSql
.
query
(
"select * from information_schema.ins_columns where table_name = 'ntb'"
)
tdSql
.
checkRows
(
14
)
...
...
tests/system-test/7-tmq/checkOffsetRowParams.py
0 → 100644
浏览文件 @
933d08b4
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
actionType
(
Enum
):
CREATE_DATABASE
=
0
CREATE_STABLE
=
1
CREATE_CTABLE
=
2
INSERT_DATA
=
3
class
TDTestCase
:
hostname
=
socket
.
gethostname
()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
newcur
(
self
,
cfg
,
host
,
port
):
user
=
"root"
password
=
"taosdata"
con
=
taos
.
connect
(
host
=
host
,
user
=
user
,
password
=
password
,
config
=
cfg
,
port
=
port
)
cur
=
con
.
cursor
()
print
(
cur
)
return
cur
def
initConsumerTable
(
self
,
cdbName
=
'cdb'
):
tdLog
.
info
(
"create consume database, and consume info table, and consume result table"
)
tdSql
.
query
(
"create database if not exists %s vgroups 1 wal_retention_period 3600"
%
(
cdbName
))
tdSql
.
query
(
"drop table if exists %s.consumeinfo "
%
(
cdbName
))
tdSql
.
query
(
"drop table if exists %s.consumeresult "
%
(
cdbName
))
tdSql
.
query
(
"create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"
%
cdbName
)
tdSql
.
query
(
"create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"
%
cdbName
)
def
initConsumerInfoTable
(
self
,
cdbName
=
'cdb'
):
tdLog
.
info
(
"drop consumeinfo table"
)
tdSql
.
query
(
"drop table if exists %s.consumeinfo "
%
(
cdbName
))
tdSql
.
query
(
"create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"
%
cdbName
)
def
insertConsumerInfo
(
self
,
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifmanualcommit
,
cdbName
=
'cdb'
):
sql
=
"insert into %s.consumeinfo values "
%
cdbName
sql
+=
"(now, %d, '%s', '%s', %d, %d, %d)"
%
(
consumerId
,
topicList
,
keyList
,
expectrowcnt
,
ifcheckdata
,
ifmanualcommit
)
tdLog
.
info
(
"consume info sql: %s"
%
sql
)
tdSql
.
query
(
sql
)
def
selectConsumeResult
(
self
,
expectRows
,
cdbName
=
'cdb'
):
resultList
=
[]
while
1
:
tdSql
.
query
(
"select * from %s.consumeresult"
%
cdbName
)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if
tdSql
.
getRows
()
==
expectRows
:
break
else
:
time
.
sleep
(
5
)
for
i
in
range
(
expectRows
):
tdLog
.
info
(
"consume id: %d, consume msgs: %d, consume rows: %d"
%
(
tdSql
.
getData
(
i
,
1
),
tdSql
.
getData
(
i
,
2
),
tdSql
.
getData
(
i
,
3
)))
resultList
.
append
(
tdSql
.
getData
(
i
,
3
))
return
resultList
def
startTmqSimProcess
(
self
,
buildPath
,
cfgPath
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
):
if
valgrind
==
1
:
logFile
=
cfgPath
+
'/../log/valgrind-tmq.log'
shellCmd
=
'nohup valgrind --log-file='
+
logFile
shellCmd
+=
'--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if
(
platform
.
system
().
lower
()
==
'windows'
):
shellCmd
=
'mintty -h never -w hide '
+
buildPath
+
'
\\
build
\\
bin
\\
tmq_sim.exe -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> nul 2>&1 &"
else
:
shellCmd
=
'nohup '
+
buildPath
+
'/build/bin/tmq_sim -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> /dev/null 2>&1 &"
tdLog
.
info
(
shellCmd
)
os
.
system
(
shellCmd
)
def
create_database
(
self
,
tsql
,
dbName
,
dropFlag
=
1
,
vgroups
=
4
,
replica
=
1
):
if
dropFlag
==
1
:
tsql
.
execute
(
"drop database if exists %s"
%
(
dbName
))
tsql
.
execute
(
"create database if not exists %s vgroups %d replica %d wal_retention_period 3600"
%
(
dbName
,
vgroups
,
replica
))
tdLog
.
debug
(
"complete to create database %s"
%
(
dbName
))
return
def
create_stable
(
self
,
tsql
,
dbName
,
stbName
):
tsql
.
execute
(
"create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"
%
(
dbName
,
stbName
))
tdLog
.
debug
(
"complete to create %s.%s"
%
(
dbName
,
stbName
))
return
def
create_ctables
(
self
,
tsql
,
dbName
,
stbName
,
ctbNum
):
tsql
.
execute
(
"use %s"
%
dbName
)
pre_create
=
"create table"
sql
=
pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for
i
in
range
(
ctbNum
):
sql
+=
" %s_%d using %s tags(%d)"
%
(
stbName
,
i
,
stbName
,
i
+
1
)
if
(
i
>
0
)
and
(
i
%
100
==
0
):
tsql
.
execute
(
sql
)
sql
=
pre_create
if
sql
!=
pre_create
:
tsql
.
execute
(
sql
)
tdLog
.
debug
(
"complete to create %d child tables in %s.%s"
%
(
ctbNum
,
dbName
,
stbName
))
return
def
insert_data
(
self
,
tsql
,
dbName
,
stbName
,
ctbNum
,
rowsPerTbl
,
batchNum
,
startTs
=
0
):
tdLog
.
debug
(
"start to insert data ............"
)
tsql
.
execute
(
"use %s"
%
dbName
)
pre_insert
=
"insert into "
sql
=
pre_insert
if
startTs
==
0
:
t
=
time
.
time
()
startTs
=
int
(
round
(
t
*
1000
))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql
=
0
for
i
in
range
(
ctbNum
):
sql
+=
" %s_%d values "
%
(
stbName
,
i
)
for
j
in
range
(
rowsPerTbl
):
sql
+=
"(%d, %d, 'tmqrow_%d') "
%
(
startTs
+
j
,
j
,
j
)
rowsOfSql
+=
1
if
(
j
>
0
)
and
((
rowsOfSql
==
batchNum
)
or
(
j
==
rowsPerTbl
-
1
)):
tsql
.
execute
(
sql
)
rowsOfSql
=
0
if
j
<
rowsPerTbl
-
1
:
sql
=
"insert into %s_%d values "
%
(
stbName
,
i
)
else
:
sql
=
"insert into "
#end sql
if
sql
!=
pre_insert
:
#print("insert sql:%s"%sql)
tsql
.
execute
(
sql
)
tdLog
.
debug
(
"insert data ............ [OK]"
)
return
def
prepareEnv
(
self
,
**
parameterDict
):
# create new connector for my thread
tsql
=
self
.
newcur
(
parameterDict
[
'cfg'
],
'localhost'
,
6030
)
if
parameterDict
[
"actionType"
]
==
actionType
.
CREATE_DATABASE
:
self
.
create_database
(
tsql
,
parameterDict
[
"dbName"
])
elif
parameterDict
[
"actionType"
]
==
actionType
.
CREATE_STABLE
:
self
.
create_stable
(
tsql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
])
elif
parameterDict
[
"actionType"
]
==
actionType
.
CREATE_CTABLE
:
self
.
create_ctables
(
tsql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
])
elif
parameterDict
[
"actionType"
]
==
actionType
.
INSERT_DATA
:
self
.
insert_data
(
tsql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
],
\
parameterDict
[
"rowsPerTbl"
],
parameterDict
[
"batchNum"
])
else
:
tdLog
.
exit
(
"not support's action: "
,
parameterDict
[
"actionType"
])
return
def
tmqCase1
(
self
,
cfgPath
,
buildPath
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
self
.
initConsumerTable
()
# create and start thread
parameterDict
=
{
'cfg'
:
''
,
\
'actionType'
:
0
,
\
'dbName'
:
'db1'
,
\
'dropFlag'
:
1
,
\
'vgroups'
:
4
,
\
'replica'
:
1
,
\
'stbName'
:
'stb1'
,
\
'ctbNum'
:
10
,
\
'rowsPerTbl'
:
10000
,
\
'batchNum'
:
100
,
\
'startTs'
:
1640966400000
}
# 2022-01-01 00:00:00.000
self
.
create_database
(
tdSql
,
parameterDict
[
"dbName"
])
self
.
create_stable
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
tdSql
.
execute
(
"create topic %s as select ts, c1, c2 from %s.%s"
%
(
topicFromStb1
,
parameterDict
[
'dbName'
],
parameterDict
[
'stbName'
]))
consumerId
=
0
expectrowcnt
=
parameterDict
[
"rowsPerTbl"
]
*
parameterDict
[
"ctbNum"
]
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:2000,\
auto.offset.reset:earliest'
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
pollDelay
=
20
showMsg
=
1
showRow
=
1
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
tdLog
.
info
(
"start show subscriptions 1"
)
while
(
1
):
tdSql
.
query
(
"show subscriptions"
)
if
(
tdSql
.
getRows
()
==
0
):
tdLog
.
info
(
"sleep"
)
time
.
sleep
(
1
)
elif
(
tdSql
.
queryResult
[
0
][
4
]
!=
None
):
# tdSql.checkData(0, 4, "earliest")
tdSql
.
checkData
(
0
,
5
,
0
)
break
time
.
sleep
(
2
)
tdLog
.
info
(
"start insert data"
)
self
.
create_ctables
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
\
parameterDict
[
"dbName"
],
\
parameterDict
[
"stbName"
],
\
parameterDict
[
"ctbNum"
],
\
parameterDict
[
"rowsPerTbl"
],
\
parameterDict
[
"batchNum"
])
time
.
sleep
(
2
)
tdLog
.
info
(
"start show subscriptions 2"
)
tdSql
.
query
(
"show subscriptions"
)
tdSql
.
checkRows
(
4
)
print
(
tdSql
.
queryResult
)
# tdSql.checkData(0, 4, 'offset(log) ver:103')
tdSql
.
checkData
(
0
,
5
,
10000
)
# tdSql.checkData(1, 4, 'offset(log) ver:103')
tdSql
.
checkData
(
1
,
5
,
10000
)
# tdSql.checkData(2, 4, 'offset(log) ver:303')
tdSql
.
checkData
(
2
,
5
,
50000
)
# tdSql.checkData(3, 4, 'offset(log) ver:239')
tdSql
.
checkData
(
3
,
5
,
30000
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
self
.
selectConsumeResult
(
expectRows
)
time
.
sleep
(
2
)
tdLog
.
info
(
"start show subscriptions 3"
)
tdSql
.
query
(
"show subscriptions"
)
tdSql
.
checkRows
(
4
)
print
(
tdSql
.
queryResult
)
tdSql
.
checkData
(
0
,
3
,
None
)
# tdSql.checkData(0, 4, 'offset(log) ver:103')
tdSql
.
checkData
(
0
,
5
,
10000
)
# tdSql.checkData(1, 4, 'offset(log) ver:103')
tdSql
.
checkData
(
1
,
5
,
10000
)
# tdSql.checkData(2, 4, 'offset(log) ver:303')
tdSql
.
checkData
(
2
,
5
,
50000
)
# tdSql.checkData(3, 4, 'offset(log) ver:239')
tdSql
.
checkData
(
3
,
5
,
30000
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
tdLog
.
exit
(
"taosd not found!"
)
else
:
tdLog
.
info
(
"taosd found in %s"
%
buildPath
)
cfgPath
=
buildPath
+
"/../sim/psim/cfg"
tdLog
.
info
(
"cfgPath: %s"
%
cfgPath
)
self
.
tmqCase1
(
cfgPath
,
buildPath
)
# self.tmqCase2(cfgPath, buildPath)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录