Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6657b8d8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6657b8d8
编写于
6月 12, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:modify offset description to string
上级
aa610b27
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
25 addition
and
21 deletion
+25
-21
include/common/tmsg.h
include/common/tmsg.h
+1
-1
include/util/tdef.h
include/util/tdef.h
+1
-1
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+2
-3
source/common/src/systable.c
source/common/src/systable.c
+5
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+7
-7
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+6
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-2
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
6657b8d8
...
...
@@ -2901,7 +2901,7 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe
// tqOffset
enum
{
TMQ_OFFSET__RESET_NONE
=
-
3
,
TMQ_OFFSET__RESET_EARLIE
A
ST
=
-
2
,
TMQ_OFFSET__RESET_EARLIEST
=
-
2
,
TMQ_OFFSET__RESET_LATEST
=
-
1
,
TMQ_OFFSET__LOG
=
1
,
TMQ_OFFSET__SNAPSHOT_DATA
=
2
,
...
...
include/util/tdef.h
浏览文件 @
6657b8d8
...
...
@@ -195,7 +195,7 @@ typedef enum ELogicConditionType {
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_OFFSET_LEN
80
// it is a null-terminated string
#define TSDB_OFFSET_LEN
64
// it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
...
...
source/client/src/clientTmq.c
浏览文件 @
6657b8d8
...
...
@@ -264,7 +264,7 @@ tmq_conf_t* tmq_conf_new() {
conf
->
withTbName
=
false
;
conf
->
autoCommit
=
true
;
conf
->
autoCommitInterval
=
DEFAULT_AUTO_COMMIT_INTERVAL
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIE
A
ST
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIEST
;
conf
->
hbBgEnable
=
true
;
return
conf
;
...
...
@@ -318,7 +318,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
conf
->
resetOffset
=
TMQ_OFFSET__RESET_NONE
;
return
TMQ_CONF_OK
;
}
else
if
(
strcasecmp
(
value
,
"earliest"
)
==
0
)
{
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIE
A
ST
;
conf
->
resetOffset
=
TMQ_OFFSET__RESET_EARLIEST
;
return
TMQ_CONF_OK
;
}
else
if
(
strcasecmp
(
value
,
"latest"
)
==
0
)
{
conf
->
resetOffset
=
TMQ_OFFSET__RESET_LATEST
;
...
...
@@ -809,7 +809,6 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows
->
vgId
=
pVg
->
vgId
;
offRows
->
rows
=
pVg
->
numOfRows
;
offRows
->
offset
=
pVg
->
offsetInfo
.
committedOffset
;
tscDebug
(
"report row:%lldd, offset:%"
PRId64
,
offRows
->
rows
,
offRows
->
offset
.
version
);
}
}
...
...
source/common/src/systable.c
浏览文件 @
6657b8d8
...
...
@@ -361,11 +361,11 @@ static const SSysDbTableSchema consumerSchema[] = {
{.
name
=
"up_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"subscribe_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"rebalance_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"
withTbN
ame"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"
useSnapshot
"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"
autoC
ommit"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"auto
CommitInterval
"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"
resetOffsetCfg"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"
msg.with.table.n
ame"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"
experimental.snapshot.enable
"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"
enable.auto.c
ommit"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
false
},
{.
name
=
"auto
.commit.interval.ms
"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"
auto.offset.reset"
,
.
bytes
=
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
,
.
sysInfo
=
false
},
};
static
const
SSysDbTableSchema
offsetSchema
[]
=
{
...
...
source/common/src/tmsg.c
浏览文件 @
6657b8d8
...
...
@@ -7131,15 +7131,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
int32_t
tFormatOffset
(
char
*
buf
,
int32_t
maxLen
,
const
STqOffsetVal
*
pVal
)
{
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_NONE
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to none)
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to earlieast)
"
);
snprintf
(
buf
,
maxLen
,
"
none
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
snprintf
(
buf
,
maxLen
,
"
earliest
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__RESET_LATEST
)
{
snprintf
(
buf
,
maxLen
,
"
offset(reset to latest)
"
);
snprintf
(
buf
,
maxLen
,
"
latest
"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__LOG
)
{
snprintf
(
buf
,
maxLen
,
"
offset(log) ver
:%"
PRId64
,
pVal
->
version
);
snprintf
(
buf
,
maxLen
,
"
log
:%"
PRId64
,
pVal
->
version
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
snprintf
(
buf
,
maxLen
,
"
offset(snapshot) uid:%"
PRId64
" ts:
%"
PRId64
,
pVal
->
uid
,
pVal
->
ts
);
snprintf
(
buf
,
maxLen
,
"
snapshot:%"
PRId64
"|
%"
PRId64
,
pVal
->
uid
,
pVal
->
ts
);
}
else
{
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -7157,7 +7157,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return
pLeft
->
uid
==
pRight
->
uid
;
}
else
{
ASSERT
(
0
);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIE
A
ST ||*/
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
/*return true;*/
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
6657b8d8
...
...
@@ -1197,8 +1197,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
autoCommitInterval
,
false
);
char
buf
[
TSDB_OFFSET_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STqOffsetVal
pVal
=
{.
type
=
pConsumer
->
resetOffsetCfg
};
tFormatOffset
(
varDataVal
(
buf
),
TSDB_OFFSET_LEN
,
&
pVal
);
varDataSetLen
(
buf
,
strlen
(
varDataVal
(
buf
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
resetOffsetCfg
,
false
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
buf
,
false
);
numOfRows
++
;
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
6657b8d8
...
...
@@ -559,7 +559,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
}
else
{
dataRsp
.
rspOffset
.
version
=
currentVer
;
// return current consume offset value
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
dataRsp
.
rspOffset
.
version
=
sver
;
// not consume yet, set the earliest position
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
dataRsp
.
rspOffset
.
version
=
ever
;
...
...
@@ -754,7 +754,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosArrayDestroy
(
tbUidList
);
}
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
id
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
tqDebug
(
"try to persist handle %s consumer:0x%"
PRIx64
,
req
.
subKey
,
pHandle
->
consumerId
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
goto
end
;
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
6657b8d8
...
...
@@ -107,7 +107,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
}
else
{
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIE
A
ST
)
{
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
useSnapshot
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey:%s, vgId:%d, (earliest) set offset to be snapshot"
,
consumerId
,
pHandle
->
subKey
,
vgId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录