Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
103b5743
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
103b5743
编写于
5月 20, 2021
作者:
V
von gosling
提交者:
GitHub
5月 20, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Revert "Revert "[ISSUE #2865] Batch message send bug fix (#2866)" (#2912)"
This reverts commit
129d7e2b
.
上级
129d7e2b
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
29 addition
and
29 deletion
+29
-29
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+29
-29
未找到文件。
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
103b5743
...
...
@@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
msg
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
long
elapsedTimeInLock
;
long
queueOffset
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
encodeResult
=
this
.
messageSerializer
.
serialize
(
msg
);
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
false
);
AppendEntryRequest
request
=
new
AppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
messageExtBatch
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
msgIdBuilder
.
setLength
(
0
);
long
elapsedTimeInLock
;
...
...
@@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog {
long
msgNum
=
0
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
encodeResult
=
this
.
messageSerializer
.
serialize
(
messageExtBatch
);
queueOffset
=
topicQueueTable
.
get
(
encodeResult
.
queueOffsetKey
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
true
);
BatchAppendEntryRequest
request
=
new
BatchAppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog {
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
false
);
AppendEntryRequest
request
=
new
AppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog {
long
msgNum
=
0
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
topicQueueTable
.
get
(
encodeResult
.
queueOffsetKey
);
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
true
);
BatchAppendEntryRequest
request
=
new
BatchAppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
...
...
@@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog {
this
.
queueOffsetKey
=
queueOffsetKey
;
}
public
void
setQueueOffsetKey
(
long
offset
)
{
data
.
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
);
public
void
setQueueOffsetKey
(
long
offset
,
boolean
isBatch
)
{
if
(!
isBatch
)
{
this
.
data
.
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
);
return
;
}
for
(
byte
[]
data
:
batchData
)
{
ByteBuffer
.
wrap
(
data
).
putLong
(
MessageDecoder
.
QUEUE_OFFSET_POSITION
,
offset
++);
}
}
public
byte
[]
getData
()
{
...
...
@@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message
private
final
int
maxMessageSize
;
// Build Message Key
private
final
StringBuilder
keyBuilder
=
new
StringBuilder
();
MessageSerializer
(
final
int
size
)
{
this
.
maxMessageSize
=
size
;
...
...
@@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog {
}
public
EncodeResult
serialize
(
final
MessageExtBatch
messageExtBatch
)
{
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
messageExtBatch
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
messageExtBatch
.
getQueueId
());
String
key
=
keyBuilder
.
toString
();
Long
queueOffset
=
DLedgerCommitLog
.
this
.
topicQueueTable
.
get
(
key
);
if
(
null
==
queueOffset
)
{
queueOffset
=
0L
;
DLedgerCommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
}
String
key
=
messageExtBatch
.
getTopic
()
+
"-"
+
messageExtBatch
.
getQueueId
();
int
totalMsgLen
=
0
;
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
wrap
();
...
...
@@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG
msgStoreItemMemory
.
putInt
(
flag
);
// 6 QUEUEOFFSET
msgStoreItemMemory
.
putLong
(
queueOffset
++
);
msgStoreItemMemory
.
putLong
(
0L
);
// 7 PHYSICALOFFSET
msgStoreItemMemory
.
putLong
(
0
);
// 8 SYSFLAG
...
...
@@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog {
this
.
sbr
=
sbr
;
}
@Override
public
synchronized
void
release
()
{
super
.
release
();
if
(
sbr
!=
null
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录