Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
d21266d9
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
d21266d9
编写于
7月 31, 2021
作者:
Y
yuz10
提交者:
GitHub
7月 31, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into develop3
上级
3aa49437
d067bfc2
变更
19
显示空白变更内容
内联
并排
Showing
19 changed file
with
378 addition
and
803 deletion
+378
-803
.travis.yml
.travis.yml
+2
-2
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
...cketmq/broker/processor/AbstractSendMessageProcessor.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
...pache/rocketmq/broker/processor/SendMessageProcessor.java
+1
-0
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
...apache/rocketmq/client/impl/factory/MQClientInstance.java
+3
-21
distribution/bin/runbroker.cmd
distribution/bin/runbroker.cmd
+1
-1
distribution/bin/runbroker.sh
distribution/bin/runbroker.sh
+1
-1
distribution/bin/runserver.cmd
distribution/bin/runserver.cmd
+1
-1
distribution/bin/runserver.sh
distribution/bin/runserver.sh
+3
-3
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
...org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+0
-1
store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
...java/org/apache/rocketmq/store/AppendMessageCallback.java
+3
-2
store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
...n/java/org/apache/rocketmq/store/AppendMessageResult.java
+17
-0
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+288
-472
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+9
-46
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
...e/src/main/java/org/apache/rocketmq/store/MappedFile.java
+13
-7
store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
...java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+12
-0
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+0
-232
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
...st/java/org/apache/rocketmq/store/AppendCallbackTest.java
+19
-9
tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
...ketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
+3
-3
未找到文件。
.travis.yml
浏览文件 @
d21266d9
...
...
@@ -16,8 +16,8 @@ matrix:
# On Linux we install latest OpenJDK 1.8 from Ubuntu repositories
-
name
:
Linux x86_64
arch
:
amd64
-
name
:
Linux aarch64
arch
:
arm64
#
- name: Linux aarch64
#
arch: arm64
cache
:
directories
:
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
浏览文件 @
d21266d9
...
...
@@ -257,8 +257,8 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
try
{
final
SendMessageRequestHeader
requestHeader
=
parseRequestHeader
(
request
);
String
namespace
=
NamespaceUtil
.
getNamespaceFromResource
(
requestHeader
.
getTopic
());
if
(
null
!=
requestHeader
)
{
String
namespace
=
NamespaceUtil
.
getNamespaceFromResource
(
requestHeader
.
getTopic
());
context
.
setNamespace
(
namespace
);
context
.
setProducerGroup
(
requestHeader
.
getProducerGroup
());
context
.
setTopic
(
requestHeader
.
getTopic
());
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
d21266d9
...
...
@@ -1445,7 +1445,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
java
.
io
.
File
commitLogDir
=
new
java
.
io
.
File
(
this
.
brokerController
.
getMessageStoreConfig
().
getStorePathRootDir
());
if
(
commitLogDir
.
exists
())
{
runtimeInfo
.
put
(
"commitLogDirCapacity"
,
String
.
format
(
"Total : %s, Free : %s."
,
MixAll
.
humanReadableByteCount
(
commitLogDir
.
getTotalSpace
(),
false
),
MixAll
.
humanReadableByteCount
(
commitLogDir
.
get
Fre
eSpace
(),
false
)));
runtimeInfo
.
put
(
"commitLogDirCapacity"
,
String
.
format
(
"Total : %s, Free : %s."
,
MixAll
.
humanReadableByteCount
(
commitLogDir
.
getTotalSpace
(),
false
),
MixAll
.
humanReadableByteCount
(
commitLogDir
.
get
Usabl
eSpace
(),
false
)));
}
return
runtimeInfo
;
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
浏览文件 @
d21266d9
...
...
@@ -612,6 +612,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
}
@Override
public
SocketAddress
getStoreHost
()
{
return
storeHost
;
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
浏览文件 @
d21266d9
...
...
@@ -882,24 +882,6 @@ public class MQClientInstance {
this
.
unregisterClient
(
null
,
group
);
}
private
void
unregisterClientWithLock
(
final
String
producerGroup
,
final
String
consumerGroup
)
{
try
{
if
(
this
.
lockHeartbeat
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
try
{
this
.
unregisterClient
(
producerGroup
,
consumerGroup
);
}
catch
(
Exception
e
)
{
log
.
error
(
"unregisterClient exception"
,
e
);
}
finally
{
this
.
lockHeartbeat
.
unlock
();
}
}
else
{
log
.
warn
(
"lock heartBeat, but failed. [{}]"
,
this
.
clientId
);
}
}
catch
(
InterruptedException
e
)
{
log
.
warn
(
"unregisterClientWithLock exception"
,
e
);
}
}
private
void
unregisterClient
(
final
String
producerGroup
,
final
String
consumerGroup
)
{
Iterator
<
Entry
<
String
,
HashMap
<
Long
,
String
>>>
it
=
this
.
brokerAddrTable
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
...
...
@@ -927,7 +909,7 @@ public class MQClientInstance {
}
}
public
boolean
registerProducer
(
final
String
group
,
final
DefaultMQProducerImpl
producer
)
{
public
synchronized
boolean
registerProducer
(
final
String
group
,
final
DefaultMQProducerImpl
producer
)
{
if
(
null
==
group
||
null
==
producer
)
{
return
false
;
}
...
...
@@ -941,9 +923,9 @@ public class MQClientInstance {
return
true
;
}
public
void
unregisterProducer
(
final
String
group
)
{
public
synchronized
void
unregisterProducer
(
final
String
group
)
{
this
.
producerTable
.
remove
(
group
);
this
.
unregisterClient
WithLock
(
group
,
null
);
this
.
unregisterClient
(
group
,
null
);
}
public
boolean
registerAdminExt
(
final
String
group
,
final
MQAdminExtInner
admin
)
{
...
...
distribution/bin/runbroker.cmd
浏览文件 @
d21266d9
...
...
@@ -28,7 +28,7 @@ set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH%
rem ===========================================================================================
rem JVM Configuration
rem ===========================================================================================
set
"JAVA_OPT=
%JAVA_OPT%
-server -Xms2g -Xmx2g
-Xmn1g
"
set
"JAVA_OPT=
%JAVA_OPT%
-server -Xms2g -Xmx2g"
set
"JAVA_OPT=
%JAVA_OPT%
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
set
"JAVA_OPT=
%JAVA_OPT%
-verbose:gc -Xloggc:
%USERPROFILE%
\mq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
set
"JAVA_OPT=
%JAVA_OPT%
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
...
...
distribution/bin/runbroker.sh
浏览文件 @
d21266d9
...
...
@@ -64,7 +64,7 @@ choose_gc_log_directory()
choose_gc_log_directory
JAVA_OPT
=
"
${
JAVA_OPT
}
-server -Xms8g -Xmx8g
-Xmn4g
"
JAVA_OPT
=
"
${
JAVA_OPT
}
-server -Xms8g -Xmx8g"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT
=
"
${
JAVA_OPT
}
-verbose:gc -Xloggc:
${
GC_LOG_DIR
}
/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
...
...
distribution/bin/runserver.cmd
浏览文件 @
d21266d9
...
...
@@ -28,7 +28,7 @@ set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH%
set
"JAVA_OPT=
%JAVA_OPT%
-server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set
"JAVA_OPT=
%JAVA_OPT%
-XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
set
"JAVA_OPT=
%JAVA_OPT%
-verbose:gc -Xloggc:"
%USERPROFILE%
\rmq_srv_gc.log
" -XX:+PrintGCDetails"
set
"JAVA_OPT=
%JAVA_OPT%
-verbose:gc -Xloggc:"
%USERPROFILE%
\rmq_srv_gc.log
" -XX:+PrintGCDetails
-XX:+PrintGCDateStamps
"
set
"JAVA_OPT=
%JAVA_OPT%
-XX:-OmitStackTraceInFastThrow"
set
"JAVA_OPT=
%JAVA_OPT%
-XX:-UseLargePages"
set
"JAVA_OPT=
%JAVA_OPT%
-Djava.ext.dirs=
%BASE_DIR%
lib;
%JAVA_HOME%
\jre\lib\ext"
...
...
distribution/bin/runserver.sh
浏览文件 @
d21266d9
...
...
@@ -68,18 +68,18 @@ choose_gc_options()
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION
=
$(
"
$JAVA
"
-version
2>&1 |
sed
-r
-n
's/.* version "([0-9]*).*$/\1/p'
)
if
[
-z
"
$JAVA_MAJOR_VERSION
"
]
||
[
"
$JAVA_MAJOR_VERSION
"
-lt
"9"
]
;
then
JAVA_OPT
=
"
${
JAVA_OPT
}
-server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT
=
"
${
JAVA_OPT
}
-verbose:gc -Xloggc:
${
GC_LOG_DIR
}
/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT
=
"
${
JAVA_OPT
}
-verbose:gc -Xloggc:
${
GC_LOG_DIR
}
/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails
-XX:+PrintGCDateStamps
"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT
=
"
${
JAVA_OPT
}
-server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT
=
"
${
JAVA_OPT
}
-Xlog:gc*:file=
${
GC_LOG_DIR
}
/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
choose_gc_log_directory
JAVA_OPT
=
"
${
JAVA_OPT
}
-server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
choose_gc_options
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:-OmitStackTraceInFastThrow"
JAVA_OPT
=
"
${
JAVA_OPT
}
-XX:-UseLargePages"
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
浏览文件 @
d21266d9
...
...
@@ -37,7 +37,6 @@ public class NettySystemConfig {
public
static
final
String
COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT
=
"com.rocketmq.remoting.client.closeSocketIfTimeout"
;
public
static
final
boolean
NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE
=
//
Boolean
.
parseBoolean
(
System
.
getProperty
(
COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE
,
"false"
));
public
static
final
int
CLIENT_ASYNC_SEMAPHORE_VALUE
=
//
...
...
store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
浏览文件 @
d21266d9
...
...
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
import
java.nio.ByteBuffer
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.store.CommitLog.PutMessageContext
;
/**
* Write messages callback interface
...
...
@@ -30,7 +31,7 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msg
);
final
int
maxBlank
,
final
MessageExtBrokerInner
msg
,
PutMessageContext
putMessageContext
);
/**
* After batched message serialization, write MapedByteBuffer
...
...
@@ -39,5 +40,5 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
);
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
,
PutMessageContext
putMessageContext
);
}
store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
浏览文件 @
d21266d9
...
...
@@ -16,6 +16,8 @@
*/
package
org.apache.rocketmq.store
;
import
java.util.function.Supplier
;
/**
* When write a message to the commit log, returns results
*/
...
...
@@ -28,6 +30,7 @@ public class AppendMessageResult {
private
int
wroteBytes
;
// Message ID
private
String
msgId
;
private
Supplier
<
String
>
msgIdSupplier
;
// Message storage timestamp
private
long
storeTimestamp
;
// Consume queue's offset(step by one)
...
...
@@ -51,6 +54,17 @@ public class AppendMessageResult {
this
.
pagecacheRT
=
pagecacheRT
;
}
public
AppendMessageResult
(
AppendMessageStatus
status
,
long
wroteOffset
,
int
wroteBytes
,
Supplier
<
String
>
msgIdSupplier
,
long
storeTimestamp
,
long
logicsOffset
,
long
pagecacheRT
)
{
this
.
status
=
status
;
this
.
wroteOffset
=
wroteOffset
;
this
.
wroteBytes
=
wroteBytes
;
this
.
msgIdSupplier
=
msgIdSupplier
;
this
.
storeTimestamp
=
storeTimestamp
;
this
.
logicsOffset
=
logicsOffset
;
this
.
pagecacheRT
=
pagecacheRT
;
}
public
long
getPagecacheRT
()
{
return
pagecacheRT
;
}
...
...
@@ -88,6 +102,9 @@ public class AppendMessageResult {
}
public
String
getMsgId
()
{
if
(
msgId
==
null
&&
msgIdSupplier
!=
null
)
{
msgId
=
msgIdSupplier
.
get
();
}
return
msgId
;
}
...
...
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
d21266d9
...
...
@@ -16,17 +16,18 @@
*/
package
org.apache.rocketmq.store
;
import
java.net.Inet4Address
;
import
java.net.Inet6Address
;
import
java.net.InetAddress
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.nio.ByteBuffer
;
import
java.util.HashMap
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
java.util.function.Supplier
;
import
org.apache.rocketmq.common.ServiceThread
;
import
org.apache.rocketmq.common.UtilAll
;
...
...
@@ -62,7 +63,7 @@ public class CommitLog {
private
final
FlushCommitLogService
commitLogService
;
private
final
AppendMessageCallback
appendMessageCallback
;
private
final
ThreadLocal
<
MessageExtBatchEncoder
>
batchEncoder
ThreadLocal
;
private
final
ThreadLocal
<
PutMessageThreadLocal
>
putMessage
ThreadLocal
;
protected
HashMap
<
String
/* topic-queueid */
,
Long
/* offset */
>
topicQueueTable
=
new
HashMap
<
String
,
Long
>(
1024
);
protected
volatile
long
confirmOffset
=
-
1L
;
...
...
@@ -84,10 +85,10 @@ public class CommitLog {
this
.
commitLogService
=
new
CommitRealTimeService
();
this
.
appendMessageCallback
=
new
DefaultAppendMessageCallback
(
defaultMessageStore
.
getMessageStoreConfig
().
getMaxMessageSize
());
batchEncoderThreadLocal
=
new
ThreadLocal
<
MessageExtBatchEncoder
>()
{
putMessageThreadLocal
=
new
ThreadLocal
<
PutMessageThreadLocal
>()
{
@Override
protected
MessageExtBatchEncoder
initialValue
()
{
return
new
MessageExtBatchEncoder
(
defaultMessageStore
.
getMessageStoreConfig
().
getMaxMessageSize
());
protected
PutMessageThreadLocal
initialValue
()
{
return
new
PutMessageThreadLocal
(
defaultMessageStore
.
getMessageStoreConfig
().
getMaxMessageSize
());
}
};
this
.
putMessageLock
=
defaultMessageStore
.
getMessageStoreConfig
().
isUseReentrantLockWhenPutMessage
()
?
new
PutMessageReentrantLock
()
:
new
PutMessageSpinLock
();
...
...
@@ -555,6 +556,14 @@ public class CommitLog {
return
beginTimeInLock
;
}
private
String
generateKey
(
StringBuilder
keyBuilder
,
MessageExt
messageExt
)
{
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
messageExt
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
messageExt
.
getQueueId
());
return
keyBuilder
.
toString
();
}
public
CompletableFuture
<
PutMessageResult
>
asyncPutMessage
(
final
MessageExtBrokerInner
msg
)
{
// Set the storage time
msg
.
setStoreTimestamp
(
System
.
currentTimeMillis
());
...
...
@@ -591,12 +600,30 @@ public class CommitLog {
}
}
InetSocketAddress
bornSocketAddress
=
(
InetSocketAddress
)
msg
.
getBornHost
();
if
(
bornSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
msg
.
setBornHostV6Flag
();
}
InetSocketAddress
storeSocketAddress
=
(
InetSocketAddress
)
msg
.
getStoreHost
();
if
(
storeSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
msg
.
setStoreHostAddressV6Flag
();
}
PutMessageThreadLocal
putMessageThreadLocal
=
this
.
putMessageThreadLocal
.
get
();
PutMessageResult
encodeResult
=
putMessageThreadLocal
.
getEncoder
().
encode
(
msg
);
if
(
encodeResult
!=
null
)
{
return
CompletableFuture
.
completedFuture
(
encodeResult
);
}
msg
.
setEncodedBuff
(
putMessageThreadLocal
.
getEncoder
().
encoderBuffer
);
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
generateKey
(
putMessageThreadLocal
.
getKeyBuilder
(),
msg
));
long
elapsedTimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
try
{
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
this
.
beginTimeInLock
=
beginLockTimestamp
;
...
...
@@ -613,7 +640,7 @@ public class CommitLog {
return
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
null
));
}
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
);
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
,
putMessageContext
);
switch
(
result
.
getStatus
())
{
case
PUT_OK:
break
;
...
...
@@ -627,7 +654,7 @@ public class CommitLog {
beginTimeInLock
=
0
;
return
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
result
));
}
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
);
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
,
putMessageContext
);
break
;
case
MESSAGE_SIZE_EXCEEDED:
case
PROPERTIES_SIZE_EXCEEDED:
...
...
@@ -693,14 +720,26 @@ public class CommitLog {
return
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
));
}
InetSocketAddress
bornSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getBornHost
();
if
(
bornSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setBornHostV6Flag
();
}
InetSocketAddress
storeSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getStoreHost
();
if
(
storeSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setStoreHostAddressV6Flag
();
}
long
elapsedTimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder
batchEncoder
=
batchEncoderThreadLocal
.
get
();
PutMessageThreadLocal
pmThreadLocal
=
this
.
putMessageThreadLocal
.
get
();
MessageExtEncoder
batchEncoder
=
pmThreadLocal
.
getEncoder
();
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
generateKey
(
pmThreadLocal
.
getKeyBuilder
(),
messageExtBatch
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
,
putMessageContext
));
putMessageLock
.
lock
();
try
{
...
...
@@ -720,7 +759,7 @@ public class CommitLog {
return
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
null
));
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
,
putMessageContext
);
switch
(
result
.
getStatus
())
{
case
PUT_OK:
break
;
...
...
@@ -734,7 +773,7 @@ public class CommitLog {
beginTimeInLock
=
0
;
return
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
result
));
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
,
putMessageContext
);
break
;
case
MESSAGE_SIZE_EXCEEDED:
case
PROPERTIES_SIZE_EXCEEDED:
...
...
@@ -784,129 +823,6 @@ public class CommitLog {
}
public
PutMessageResult
putMessage
(
final
MessageExtBrokerInner
msg
)
{
// Set the storage time
msg
.
setStoreTimestamp
(
System
.
currentTimeMillis
());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg
.
setBodyCRC
(
UtilAll
.
crc32
(
msg
.
getBody
()));
// Back to Results
AppendMessageResult
result
=
null
;
StoreStatsService
storeStatsService
=
this
.
defaultMessageStore
.
getStoreStatsService
();
String
topic
=
msg
.
getTopic
();
int
queueId
=
msg
.
getQueueId
();
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
msg
.
getSysFlag
());
if
(
tranType
==
MessageSysFlag
.
TRANSACTION_NOT_TYPE
||
tranType
==
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
)
{
// Delay Delivery
if
(
msg
.
getDelayTimeLevel
()
>
0
)
{
if
(
msg
.
getDelayTimeLevel
()
>
this
.
defaultMessageStore
.
getScheduleMessageService
().
getMaxDelayLevel
())
{
msg
.
setDelayTimeLevel
(
this
.
defaultMessageStore
.
getScheduleMessageService
().
getMaxDelayLevel
());
}
topic
=
TopicValidator
.
RMQ_SYS_SCHEDULE_TOPIC
;
queueId
=
ScheduleMessageService
.
delayLevel2QueueId
(
msg
.
getDelayTimeLevel
());
// Backup real topic, queueId
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_REAL_TOPIC
,
msg
.
getTopic
());
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_REAL_QUEUE_ID
,
String
.
valueOf
(
msg
.
getQueueId
()));
msg
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msg
.
getProperties
()));
msg
.
setTopic
(
topic
);
msg
.
setQueueId
(
queueId
);
}
}
InetSocketAddress
bornSocketAddress
=
(
InetSocketAddress
)
msg
.
getBornHost
();
if
(
bornSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
msg
.
setBornHostV6Flag
();
}
InetSocketAddress
storeSocketAddress
=
(
InetSocketAddress
)
msg
.
getStoreHost
();
if
(
storeSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
msg
.
setStoreHostAddressV6Flag
();
}
long
elapsedTimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
try
{
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
this
.
beginTimeInLock
=
beginLockTimestamp
;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg
.
setStoreTimestamp
(
beginLockTimestamp
);
if
(
null
==
mappedFile
||
mappedFile
.
isFull
())
{
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
// Mark: NewFile may be cause noise
}
if
(
null
==
mappedFile
)
{
log
.
error
(
"create mapped file1 error, topic: "
+
msg
.
getTopic
()
+
" clientAddr: "
+
msg
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
null
);
}
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
);
switch
(
result
.
getStatus
())
{
case
PUT_OK:
break
;
case
END_OF_FILE:
unlockMappedFile
=
mappedFile
;
// Create a new file, re-write the message
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
if
(
null
==
mappedFile
)
{
// XXX: warn and notify me
log
.
error
(
"create mapped file2 error, topic: "
+
msg
.
getTopic
()
+
" clientAddr: "
+
msg
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
result
);
}
result
=
mappedFile
.
appendMessage
(
msg
,
this
.
appendMessageCallback
);
break
;
case
MESSAGE_SIZE_EXCEEDED:
case
PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
result
);
case
UNKNOWN_ERROR:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
default
:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
}
elapsedTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
}
finally
{
putMessageLock
.
unlock
();
}
if
(
elapsedTimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
elapsedTimeInLock
,
msg
.
getBody
().
length
,
result
);
}
if
(
null
!=
unlockMappedFile
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
this
.
defaultMessageStore
.
unlockMappedFile
(
unlockMappedFile
);
}
PutMessageResult
putMessageResult
=
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
result
);
// Statistics
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
msg
.
getTopic
()).
incrementAndGet
();
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
topic
).
addAndGet
(
result
.
getWroteBytes
());
handleDiskFlush
(
result
,
putMessageResult
,
msg
);
handleHA
(
result
,
putMessageResult
,
msg
);
return
putMessageResult
;
}
public
CompletableFuture
<
PutMessageStatus
>
submitFlushRequest
(
AppendMessageResult
result
,
MessageExt
messageExt
)
{
// Synchronization flush
if
(
FlushDiskType
.
SYNC_FLUSH
==
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
...
...
@@ -951,179 +867,6 @@ public class CommitLog {
return
CompletableFuture
.
completedFuture
(
PutMessageStatus
.
PUT_OK
);
}
public
void
handleDiskFlush
(
AppendMessageResult
result
,
PutMessageResult
putMessageResult
,
MessageExt
messageExt
)
{
// Synchronization flush
if
(
FlushDiskType
.
SYNC_FLUSH
==
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
final
GroupCommitService
service
=
(
GroupCommitService
)
this
.
flushCommitLogService
;
if
(
messageExt
.
isWaitStoreMsgOK
())
{
GroupCommitRequest
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
service
.
putRequest
(
request
);
CompletableFuture
<
PutMessageStatus
>
flushOkFuture
=
request
.
future
();
PutMessageStatus
flushStatus
=
null
;
try
{
flushStatus
=
flushOkFuture
.
get
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
(),
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
|
ExecutionException
|
TimeoutException
e
)
{
//flushOK=false;
}
if
(
flushStatus
!=
PutMessageStatus
.
PUT_OK
)
{
log
.
error
(
"do groupcommit, wait for flush failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
);
}
}
else
{
service
.
wakeup
();
}
}
// Asynchronous flush
else
{
if
(!
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isTransientStorePoolEnable
())
{
flushCommitLogService
.
wakeup
();
}
else
{
commitLogService
.
wakeup
();
}
}
}
public
void
handleHA
(
AppendMessageResult
result
,
PutMessageResult
putMessageResult
,
MessageExt
messageExt
)
{
if
(
BrokerRole
.
SYNC_MASTER
==
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getBrokerRole
())
{
HAService
service
=
this
.
defaultMessageStore
.
getHaService
();
if
(
messageExt
.
isWaitStoreMsgOK
())
{
// Determine whether to wait
if
(
service
.
isSlaveOK
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
()))
{
GroupCommitRequest
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
service
.
putRequest
(
request
);
service
.
getWaitNotifyObject
().
wakeupAll
();
PutMessageStatus
replicaStatus
=
null
;
try
{
replicaStatus
=
request
.
future
().
get
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
(),
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
|
ExecutionException
|
TimeoutException
e
)
{
}
if
(
replicaStatus
!=
PutMessageStatus
.
PUT_OK
)
{
log
.
error
(
"do sync transfer other node, wait return, but failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostNameString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
);
}
}
// Slave problem
else
{
// Tell the producer, slave not available
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
SLAVE_NOT_AVAILABLE
);
}
}
}
}
public
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
)
{
messageExtBatch
.
setStoreTimestamp
(
System
.
currentTimeMillis
());
AppendMessageResult
result
;
StoreStatsService
storeStatsService
=
this
.
defaultMessageStore
.
getStoreStatsService
();
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
messageExtBatch
.
getSysFlag
());
if
(
tranType
!=
MessageSysFlag
.
TRANSACTION_NOT_TYPE
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
if
(
messageExtBatch
.
getDelayTimeLevel
()
>
0
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
InetSocketAddress
bornSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getBornHost
();
if
(
bornSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setBornHostV6Flag
();
}
InetSocketAddress
storeSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getStoreHost
();
if
(
storeSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setStoreHostAddressV6Flag
();
}
long
elapsedTimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder
batchEncoder
=
batchEncoderThreadLocal
.
get
();
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
putMessageLock
.
lock
();
try
{
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
this
.
beginTimeInLock
=
beginLockTimestamp
;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch
.
setStoreTimestamp
(
beginLockTimestamp
);
if
(
null
==
mappedFile
||
mappedFile
.
isFull
())
{
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
// Mark: NewFile may be cause noise
}
if
(
null
==
mappedFile
)
{
log
.
error
(
"Create mapped file1 error, topic: {} clientAddr: {}"
,
messageExtBatch
.
getTopic
(),
messageExtBatch
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
null
);
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
switch
(
result
.
getStatus
())
{
case
PUT_OK:
break
;
case
END_OF_FILE:
unlockMappedFile
=
mappedFile
;
// Create a new file, re-write the message
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
if
(
null
==
mappedFile
)
{
// XXX: warn and notify me
log
.
error
(
"Create mapped file2 error, topic: {} clientAddr: {}"
,
messageExtBatch
.
getTopic
(),
messageExtBatch
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
result
);
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
break
;
case
MESSAGE_SIZE_EXCEEDED:
case
PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
result
);
case
UNKNOWN_ERROR:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
default
:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
}
elapsedTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
}
finally
{
putMessageLock
.
unlock
();
}
if
(
elapsedTimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
elapsedTimeInLock
,
messageExtBatch
.
getBody
().
length
,
result
);
}
if
(
null
!=
unlockMappedFile
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
this
.
defaultMessageStore
.
unlockMappedFile
(
unlockMappedFile
);
}
PutMessageResult
putMessageResult
=
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
result
);
// Statistics
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
result
.
getMsgNum
());
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
result
.
getWroteBytes
());
handleDiskFlush
(
result
,
putMessageResult
,
messageExtBatch
);
handleHA
(
result
,
putMessageResult
,
messageExtBatch
);
return
putMessageResult
;
}
/**
* According to receive certain message or offset storage time if an error occurs, it returns -1
*/
...
...
@@ -1509,50 +1252,33 @@ public class CommitLog {
private
final
ByteBuffer
msgStoreItemMemory
;
// The maximum length of the message
private
final
int
maxMessageSize
;
// Build Message Key
private
final
StringBuilder
keyBuilder
=
new
StringBuilder
();
private
final
StringBuilder
msgIdBuilder
=
new
StringBuilder
();
DefaultAppendMessageCallback
(
final
int
size
)
{
this
.
msgIdMemory
=
ByteBuffer
.
allocate
(
4
+
4
+
8
);
this
.
msgIdV6Memory
=
ByteBuffer
.
allocate
(
16
+
4
+
8
);
this
.
msgStoreItemMemory
=
ByteBuffer
.
allocate
(
size
+
END_FILE_MIN_BLANK_LENGTH
);
this
.
msgStoreItemMemory
=
ByteBuffer
.
allocate
(
END_FILE_MIN_BLANK_LENGTH
);
this
.
maxMessageSize
=
size
;
}
public
ByteBuffer
getMsgStoreItemMemory
()
{
return
msgStoreItemMemory
;
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msgInner
)
{
final
MessageExtBrokerInner
msgInner
,
PutMessageContext
putMessageContext
)
{
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long
wroteOffset
=
fileFromOffset
+
byteBuffer
.
position
();
Supplier
<
String
>
msgIdSupplier
=
()
->
{
int
sysflag
=
msgInner
.
getSysFlag
();
int
bornHostLength
=
(
sysflag
&
MessageSysFlag
.
BORNHOST_V6_FLAG
)
==
0
?
4
+
4
:
16
+
4
;
int
storeHostLength
=
(
sysflag
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
?
4
+
4
:
16
+
4
;
ByteBuffer
bornHostHolder
=
ByteBuffer
.
allocate
(
bornHostLength
);
ByteBuffer
storeHostHolder
=
ByteBuffer
.
allocate
(
storeHostLength
);
this
.
resetByteBuffer
(
storeHostHolder
,
storeHostLength
);
String
msgId
;
if
((
sysflag
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
)
{
msgId
=
MessageDecoder
.
createMessageId
(
this
.
msgIdMemory
,
msgInner
.
getStoreHostBytes
(
storeHostHolder
),
wroteOffset
);
}
else
{
msgId
=
MessageDecoder
.
createMessageId
(
this
.
msgIdV6Memory
,
msgInner
.
getStoreHostBytes
(
storeHostHolder
),
wroteOffset
);
}
int
msgIdLen
=
(
sysflag
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
?
4
+
4
+
8
:
16
+
4
+
8
;
ByteBuffer
msgIdBuffer
=
ByteBuffer
.
allocate
(
msgIdLen
);
MessageExt
.
socketAddress2ByteBuffer
(
msgInner
.
getStoreHost
(),
msgIdBuffer
);
msgIdBuffer
.
clear
();
//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer
.
putLong
(
msgIdLen
-
8
,
wroteOffset
);
return
UtilAll
.
bytes2string
(
msgIdBuffer
.
array
());
};
// Record ConsumeQueue information
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
msgInner
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
msgInner
.
getQueueId
());
String
key
=
keyBuilder
.
toString
();
String
key
=
putMessageContext
.
getTopicQueueTableKey
();
Long
queueOffset
=
CommitLog
.
this
.
topicQueueTable
.
get
(
key
);
if
(
null
==
queueOffset
)
{
queueOffset
=
0L
;
...
...
@@ -1574,36 +1300,12 @@ public class CommitLog {
break
;
}
/**
* Serialize message
*/
final
byte
[]
propertiesData
=
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
propertiesLength
=
propertiesData
==
null
?
0
:
propertiesData
.
length
;
if
(
propertiesLength
>
Short
.
MAX_VALUE
)
{
log
.
warn
(
"putMessage message properties length too long. length={}"
,
propertiesData
.
length
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
PROPERTIES_SIZE_EXCEEDED
);
}
final
byte
[]
topicData
=
msgInner
.
getTopic
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
topicLength
=
topicData
.
length
;
final
int
bodyLength
=
msgInner
.
getBody
()
==
null
?
0
:
msgInner
.
getBody
().
length
;
final
int
msgLen
=
calMsgLength
(
msgInner
.
getSysFlag
(),
bodyLength
,
topicLength
,
propertiesLength
);
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLength
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
ByteBuffer
preEncodeBuffer
=
msgInner
.
getEncodedBuff
();
final
int
msgLen
=
preEncodeBuffer
.
getInt
(
0
);
// Determines whether there is sufficient free space
if
((
msgLen
+
END_FILE_MIN_BLANK_LENGTH
)
>
maxBlank
)
{
this
.
resetByteBuffer
(
this
.
msgStoreItemMemory
,
maxBlank
);
this
.
msgStoreItemMemory
.
clear
(
);
// 1 TOTALSIZE
this
.
msgStoreItemMemory
.
putInt
(
maxBlank
);
// 2 MAGICCODE
...
...
@@ -1611,60 +1313,31 @@ public class CommitLog {
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
maxBlank
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgId
,
msgInner
.
getStoreTimestamp
(),
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
8
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
/* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
// Initialization of storage space
this
.
resetByteBuffer
(
msgStoreItemMemory
,
msgLen
);
// 1 TOTALSIZE
this
.
msgStoreItemMemory
.
putInt
(
msgLen
);
// 2 MAGICCODE
this
.
msgStoreItemMemory
.
putInt
(
CommitLog
.
MESSAGE_MAGIC_CODE
);
// 3 BODYCRC
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getBodyCRC
());
// 4 QUEUEID
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getQueueId
());
// 5 FLAG
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getFlag
());
int
pos
=
4
+
4
+
4
+
4
+
4
;
// 6 QUEUEOFFSET
this
.
msgStoreItemMemory
.
putLong
(
queueOffset
);
preEncodeBuffer
.
putLong
(
pos
,
queueOffset
);
pos
+=
8
;
// 7 PHYSICALOFFSET
this
.
msgStoreItemMemory
.
putLong
(
fileFromOffset
+
byteBuffer
.
position
());
// 8 SYSFLAG
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getSysFlag
());
// 9 BORNTIMESTAMP
this
.
msgStoreItemMemory
.
putLong
(
msgInner
.
getBornTimestamp
());
// 10 BORNHOST
this
.
resetByteBuffer
(
bornHostHolder
,
bornHostLength
);
this
.
msgStoreItemMemory
.
put
(
msgInner
.
getBornHostBytes
(
bornHostHolder
));
// 11 STORETIMESTAMP
this
.
msgStoreItemMemory
.
putLong
(
msgInner
.
getStoreTimestamp
());
// 12 STOREHOSTADDRESS
this
.
resetByteBuffer
(
storeHostHolder
,
storeHostLength
);
this
.
msgStoreItemMemory
.
put
(
msgInner
.
getStoreHostBytes
(
storeHostHolder
));
// 13 RECONSUMETIMES
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getReconsumeTimes
());
// 14 Prepared Transaction Offset
this
.
msgStoreItemMemory
.
putLong
(
msgInner
.
getPreparedTransactionOffset
());
// 15 BODY
this
.
msgStoreItemMemory
.
putInt
(
bodyLength
);
if
(
bodyLength
>
0
)
this
.
msgStoreItemMemory
.
put
(
msgInner
.
getBody
());
// 16 TOPIC
this
.
msgStoreItemMemory
.
put
((
byte
)
topicLength
);
this
.
msgStoreItemMemory
.
put
(
topicData
);
// 17 PROPERTIES
this
.
msgStoreItemMemory
.
putShort
((
short
)
propertiesLength
);
if
(
propertiesLength
>
0
)
this
.
msgStoreItemMemory
.
put
(
propertiesData
);
preEncodeBuffer
.
putLong
(
pos
,
fileFromOffset
+
byteBuffer
.
position
());
int
ipLen
=
(
msgInner
.
getSysFlag
()
&
MessageSysFlag
.
BORNHOST_V6_FLAG
)
==
0
?
4
+
4
:
16
+
4
;
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos
+=
8
+
4
+
8
+
ipLen
;
// refresh store time stamp in lock
preEncodeBuffer
.
putLong
(
pos
,
msgInner
.
getStoreTimestamp
());
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
// Write messages to the queue buffer
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
msgLen
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
msgLen
,
msgId
,
byteBuffer
.
put
(
preEncodeBuffer
);
msgInner
.
setEncodedBuff
(
null
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
msgLen
,
msgId
Supplier
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
switch
(
tranType
)
{
...
...
@@ -1683,16 +1356,12 @@ public class CommitLog {
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
)
{
final
MessageExtBatch
messageExtBatch
,
PutMessageContext
putMessageContext
)
{
byteBuffer
.
mark
();
//physical offset
long
wroteOffset
=
fileFromOffset
+
byteBuffer
.
position
();
// Record ConsumeQueue information
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
messageExtBatch
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
messageExtBatch
.
getQueueId
());
String
key
=
keyBuilder
.
toString
();
String
key
=
putMessageContext
.
getTopicQueueTableKey
();
Long
queueOffset
=
CommitLog
.
this
.
topicQueueTable
.
get
(
key
);
if
(
null
==
queueOffset
)
{
queueOffset
=
0L
;
...
...
@@ -1701,17 +1370,35 @@ public class CommitLog {
long
beginQueueOffset
=
queueOffset
;
int
totalMsgLen
=
0
;
int
msgNum
=
0
;
msgIdBuilder
.
setLength
(
0
);
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
getEncodedBuff
();
int
sysFlag
=
messageExtBatch
.
getSysFlag
();
int
bornHostLength
=
(
sysFlag
&
MessageSysFlag
.
BORNHOST_V6_FLAG
)
==
0
?
4
+
4
:
16
+
4
;
int
storeHostLength
=
(
sysFlag
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
?
4
+
4
:
16
+
4
;
ByteBuffer
storeHostHolder
=
ByteBuffer
.
allocate
(
storeHostLength
);
Supplier
<
String
>
msgIdSupplier
=
()
->
{
int
msgIdLen
=
storeHostLength
+
8
;
int
batchCount
=
putMessageContext
.
getBatchSize
();
long
[]
phyPosArray
=
putMessageContext
.
getPhyPos
();
ByteBuffer
msgIdBuffer
=
ByteBuffer
.
allocate
(
msgIdLen
);
MessageExt
.
socketAddress2ByteBuffer
(
messageExtBatch
.
getStoreHost
(),
msgIdBuffer
);
msgIdBuffer
.
clear
();
//because socketAddress2ByteBuffer flip the buffer
StringBuilder
buffer
=
new
StringBuilder
(
batchCount
*
msgIdLen
*
2
+
batchCount
-
1
);
for
(
int
i
=
0
;
i
<
phyPosArray
.
length
;
i
++)
{
msgIdBuffer
.
putLong
(
msgIdLen
-
8
,
phyPosArray
[
i
]);
String
msgId
=
UtilAll
.
bytes2string
(
msgIdBuffer
.
array
());
if
(
i
!=
0
)
{
buffer
.
append
(
','
);
}
buffer
.
append
(
msgId
);
}
return
buffer
.
toString
();
};
this
.
resetByteBuffer
(
storeHostHolder
,
storeHostLength
);
ByteBuffer
storeHostBytes
=
messageExtBatch
.
getStoreHostBytes
(
storeHostHolder
);
messagesByteBuff
.
mark
();
int
index
=
0
;
while
(
messagesByteBuff
.
hasRemaining
())
{
// 1 TOTALSIZE
final
int
msgPos
=
messagesByteBuff
.
position
();
...
...
@@ -1726,7 +1413,7 @@ public class CommitLog {
totalMsgLen
+=
msgLen
;
// Determines whether there is sufficient free space
if
((
totalMsgLen
+
END_FILE_MIN_BLANK_LENGTH
)
>
maxBlank
)
{
this
.
resetByteBuffer
(
this
.
msgStoreItemMemory
,
8
);
this
.
msgStoreItemMemory
.
clear
(
);
// 1 TOTALSIZE
this
.
msgStoreItemMemory
.
putInt
(
maxBlank
);
// 2 MAGICCODE
...
...
@@ -1737,27 +1424,20 @@ public class CommitLog {
// Here the length of the specially set maxBlank
byteBuffer
.
reset
();
//ignore the previous appended messages
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
8
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgId
Builder
.
toString
()
,
messageExtBatch
.
getStoreTimestamp
(),
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgId
Supplier
,
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
//move to add queue offset and commitlog offset
messagesByteBuff
.
position
(
msgPos
+
20
);
messagesByteBuff
.
putLong
(
queueOffset
);
messagesByteBuff
.
putLong
(
wroteOffset
+
totalMsgLen
-
msgLen
);
storeHostBytes
.
rewind
();
String
msgId
;
if
((
sysFlag
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
)
{
msgId
=
MessageDecoder
.
createMessageId
(
this
.
msgIdMemory
,
storeHostBytes
,
wroteOffset
+
totalMsgLen
-
msgLen
);
}
else
{
msgId
=
MessageDecoder
.
createMessageId
(
this
.
msgIdV6Memory
,
storeHostBytes
,
wroteOffset
+
totalMsgLen
-
msgLen
);
}
if
(
msgIdBuilder
.
length
()
>
0
)
{
msgIdBuilder
.
append
(
','
).
append
(
msgId
);
}
else
{
msgIdBuilder
.
append
(
msgId
);
}
int
pos
=
msgPos
+
20
;
messagesByteBuff
.
putLong
(
pos
,
queueOffset
);
pos
+=
8
;
messagesByteBuff
.
putLong
(
pos
,
wroteOffset
+
totalMsgLen
-
msgLen
);
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos
+=
8
+
4
+
8
+
bornHostLength
;
// refresh store time stamp in lock
messagesByteBuff
.
putLong
(
pos
,
messageExtBatch
.
getStoreTimestamp
());
putMessageContext
.
getPhyPos
()[
index
++]
=
wroteOffset
+
totalMsgLen
-
msgLen
;
queueOffset
++;
msgNum
++;
messagesByteBuff
.
position
(
msgPos
+
msgLen
);
...
...
@@ -1767,7 +1447,7 @@ public class CommitLog {
messagesByteBuff
.
limit
(
totalMsgLen
);
byteBuffer
.
put
(
messagesByteBuff
);
messageExtBatch
.
setEncodedBuff
(
null
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
totalMsgLen
,
msgId
Builder
.
toString
()
,
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
totalMsgLen
,
msgId
Supplier
,
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
result
.
setMsgNum
(
msgNum
);
CommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
...
...
@@ -1782,19 +1462,104 @@ public class CommitLog {
}
public
static
class
MessageExt
Batch
Encoder
{
public
static
class
MessageExtEncoder
{
// Store the message content
private
final
ByteBuffer
msgBatchMemory
;
private
final
ByteBuffer
encoderBuffer
;
// The maximum length of the message
private
final
int
maxMessageSize
;
MessageExt
Batch
Encoder
(
final
int
size
)
{
this
.
msgBatchMemory
=
ByteBuffer
.
allocateDirect
(
size
);
MessageExtEncoder
(
final
int
size
)
{
this
.
encoderBuffer
=
ByteBuffer
.
allocateDirect
(
size
);
this
.
maxMessageSize
=
size
;
}
public
ByteBuffer
encode
(
final
MessageExtBatch
messageExtBatch
)
{
msgBatchMemory
.
clear
();
//not thread-safe
private
void
socketAddress2ByteBuffer
(
final
SocketAddress
socketAddress
,
final
ByteBuffer
byteBuffer
)
{
InetSocketAddress
inetSocketAddress
=
(
InetSocketAddress
)
socketAddress
;
InetAddress
address
=
inetSocketAddress
.
getAddress
();
if
(
address
instanceof
Inet4Address
)
{
byteBuffer
.
put
(
inetSocketAddress
.
getAddress
().
getAddress
(),
0
,
4
);
}
else
{
byteBuffer
.
put
(
inetSocketAddress
.
getAddress
().
getAddress
(),
0
,
16
);
}
byteBuffer
.
putInt
(
inetSocketAddress
.
getPort
());
}
protected
PutMessageResult
encode
(
MessageExtBrokerInner
msgInner
)
{
/**
* Serialize message
*/
final
byte
[]
propertiesData
=
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
propertiesLength
=
propertiesData
==
null
?
0
:
propertiesData
.
length
;
if
(
propertiesLength
>
Short
.
MAX_VALUE
)
{
log
.
warn
(
"putMessage message properties length too long. length={}"
,
propertiesData
.
length
);
return
new
PutMessageResult
(
PutMessageStatus
.
PROPERTIES_SIZE_EXCEEDED
,
null
);
}
final
byte
[]
topicData
=
msgInner
.
getTopic
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
topicLength
=
topicData
.
length
;
final
int
bodyLength
=
msgInner
.
getBody
()
==
null
?
0
:
msgInner
.
getBody
().
length
;
final
int
msgLen
=
calMsgLength
(
msgInner
.
getSysFlag
(),
bodyLength
,
topicLength
,
propertiesLength
);
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLength
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
// Initialization of storage space
this
.
resetByteBuffer
(
encoderBuffer
,
msgLen
);
// 1 TOTALSIZE
this
.
encoderBuffer
.
putInt
(
msgLen
);
// 2 MAGICCODE
this
.
encoderBuffer
.
putInt
(
CommitLog
.
MESSAGE_MAGIC_CODE
);
// 3 BODYCRC
this
.
encoderBuffer
.
putInt
(
msgInner
.
getBodyCRC
());
// 4 QUEUEID
this
.
encoderBuffer
.
putInt
(
msgInner
.
getQueueId
());
// 5 FLAG
this
.
encoderBuffer
.
putInt
(
msgInner
.
getFlag
());
// 6 QUEUEOFFSET, need update later
this
.
encoderBuffer
.
putLong
(
0
);
// 7 PHYSICALOFFSET, need update later
this
.
encoderBuffer
.
putLong
(
0
);
// 8 SYSFLAG
this
.
encoderBuffer
.
putInt
(
msgInner
.
getSysFlag
());
// 9 BORNTIMESTAMP
this
.
encoderBuffer
.
putLong
(
msgInner
.
getBornTimestamp
());
// 10 BORNHOST
socketAddress2ByteBuffer
(
msgInner
.
getBornHost
()
,
this
.
encoderBuffer
);
// 11 STORETIMESTAMP
this
.
encoderBuffer
.
putLong
(
msgInner
.
getStoreTimestamp
());
// 12 STOREHOSTADDRESS
socketAddress2ByteBuffer
(
msgInner
.
getStoreHost
()
,
this
.
encoderBuffer
);
// 13 RECONSUMETIMES
this
.
encoderBuffer
.
putInt
(
msgInner
.
getReconsumeTimes
());
// 14 Prepared Transaction Offset
this
.
encoderBuffer
.
putLong
(
msgInner
.
getPreparedTransactionOffset
());
// 15 BODY
this
.
encoderBuffer
.
putInt
(
bodyLength
);
if
(
bodyLength
>
0
)
this
.
encoderBuffer
.
put
(
msgInner
.
getBody
());
// 16 TOPIC
this
.
encoderBuffer
.
put
((
byte
)
topicLength
);
this
.
encoderBuffer
.
put
(
topicData
);
// 17 PROPERTIES
this
.
encoderBuffer
.
putShort
((
short
)
propertiesLength
);
if
(
propertiesLength
>
0
)
this
.
encoderBuffer
.
put
(
propertiesData
);
encoderBuffer
.
flip
();
return
null
;
}
protected
ByteBuffer
encode
(
final
MessageExtBatch
messageExtBatch
,
PutMessageContext
putMessageContext
)
{
encoderBuffer
.
clear
();
//not thread-safe
int
totalMsgLen
=
0
;
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
wrap
();
...
...
@@ -1809,7 +1574,9 @@ public class CommitLog {
final
byte
[]
batchPropData
=
batchPropStr
.
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
short
batchPropLen
=
(
short
)
batchPropData
.
length
;
int
batchSize
=
0
;
while
(
messagesByteBuff
.
hasRemaining
())
{
batchSize
++;
// 1 TOTALSIZE
messagesByteBuff
.
getInt
();
// 2 MAGICCODE
...
...
@@ -1849,53 +1616,55 @@ public class CommitLog {
}
// 1 TOTALSIZE
this
.
msgBatchMemory
.
putInt
(
msgLen
);
this
.
encoderBuffer
.
putInt
(
msgLen
);
// 2 MAGICCODE
this
.
msgBatchMemory
.
putInt
(
CommitLog
.
MESSAGE_MAGIC_CODE
);
this
.
encoderBuffer
.
putInt
(
CommitLog
.
MESSAGE_MAGIC_CODE
);
// 3 BODYCRC
this
.
msgBatchMemory
.
putInt
(
bodyCrc
);
this
.
encoderBuffer
.
putInt
(
bodyCrc
);
// 4 QUEUEID
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getQueueId
());
this
.
encoderBuffer
.
putInt
(
messageExtBatch
.
getQueueId
());
// 5 FLAG
this
.
msgBatchMemory
.
putInt
(
flag
);
this
.
encoderBuffer
.
putInt
(
flag
);
// 6 QUEUEOFFSET
this
.
msgBatchMemory
.
putLong
(
0
);
this
.
encoderBuffer
.
putLong
(
0
);
// 7 PHYSICALOFFSET
this
.
msgBatchMemory
.
putLong
(
0
);
this
.
encoderBuffer
.
putLong
(
0
);
// 8 SYSFLAG
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getSysFlag
());
this
.
encoderBuffer
.
putInt
(
messageExtBatch
.
getSysFlag
());
// 9 BORNTIMESTAMP
this
.
msgBatchMemory
.
putLong
(
messageExtBatch
.
getBornTimestamp
());
this
.
encoderBuffer
.
putLong
(
messageExtBatch
.
getBornTimestamp
());
// 10 BORNHOST
this
.
resetByteBuffer
(
bornHostHolder
,
bornHostLength
);
this
.
msgBatchMemory
.
put
(
messageExtBatch
.
getBornHostBytes
(
bornHostHolder
));
this
.
encoderBuffer
.
put
(
messageExtBatch
.
getBornHostBytes
(
bornHostHolder
));
// 11 STORETIMESTAMP
this
.
msgBatchMemory
.
putLong
(
messageExtBatch
.
getStoreTimestamp
());
this
.
encoderBuffer
.
putLong
(
messageExtBatch
.
getStoreTimestamp
());
// 12 STOREHOSTADDRESS
this
.
resetByteBuffer
(
storeHostHolder
,
storeHostLength
);
this
.
msgBatchMemory
.
put
(
messageExtBatch
.
getStoreHostBytes
(
storeHostHolder
));
this
.
encoderBuffer
.
put
(
messageExtBatch
.
getStoreHostBytes
(
storeHostHolder
));
// 13 RECONSUMETIMES
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getReconsumeTimes
());
this
.
encoderBuffer
.
putInt
(
messageExtBatch
.
getReconsumeTimes
());
// 14 Prepared Transaction Offset, batch does not support transaction
this
.
msgBatchMemory
.
putLong
(
0
);
this
.
encoderBuffer
.
putLong
(
0
);
// 15 BODY
this
.
msgBatchMemory
.
putInt
(
bodyLen
);
this
.
encoderBuffer
.
putInt
(
bodyLen
);
if
(
bodyLen
>
0
)
this
.
msgBatchMemory
.
put
(
messagesByteBuff
.
array
(),
bodyPos
,
bodyLen
);
this
.
encoderBuffer
.
put
(
messagesByteBuff
.
array
(),
bodyPos
,
bodyLen
);
// 16 TOPIC
this
.
msgBatchMemory
.
put
((
byte
)
topicLength
);
this
.
msgBatchMemory
.
put
(
topicData
);
this
.
encoderBuffer
.
put
((
byte
)
topicLength
);
this
.
encoderBuffer
.
put
(
topicData
);
// 17 PROPERTIES
this
.
msgBatchMemory
.
putShort
((
short
)
(
propertiesLen
+
batchPropLen
));
this
.
encoderBuffer
.
putShort
((
short
)
(
propertiesLen
+
batchPropLen
));
if
(
propertiesLen
>
0
)
{
this
.
msgBatchMemory
.
put
(
messagesByteBuff
.
array
(),
propertiesPos
,
propertiesLen
);
this
.
encoderBuffer
.
put
(
messagesByteBuff
.
array
(),
propertiesPos
,
propertiesLen
);
}
if
(
batchPropLen
>
0
)
{
this
.
msgBatchMemory
.
put
(
batchPropData
,
0
,
batchPropLen
);
this
.
encoderBuffer
.
put
(
batchPropData
,
0
,
batchPropLen
);
}
}
msgBatchMemory
.
flip
();
return
msgBatchMemory
;
putMessageContext
.
setBatchSize
(
batchSize
);
putMessageContext
.
setPhyPos
(
new
long
[
batchSize
]);
encoderBuffer
.
flip
();
return
encoderBuffer
;
}
private
void
resetByteBuffer
(
final
ByteBuffer
byteBuffer
,
final
int
limit
)
{
...
...
@@ -1904,4 +1673,51 @@ public class CommitLog {
}
}
static
class
PutMessageThreadLocal
{
private
MessageExtEncoder
encoder
;
private
StringBuilder
keyBuilder
;
PutMessageThreadLocal
(
int
size
)
{
encoder
=
new
MessageExtEncoder
(
size
);
keyBuilder
=
new
StringBuilder
();
}
public
MessageExtEncoder
getEncoder
()
{
return
encoder
;
}
public
StringBuilder
getKeyBuilder
()
{
return
keyBuilder
;
}
}
static
class
PutMessageContext
{
private
String
topicQueueTableKey
;
private
long
[]
phyPos
;
private
int
batchSize
;
public
PutMessageContext
(
String
topicQueueTableKey
)
{
this
.
topicQueueTableKey
=
topicQueueTableKey
;
}
public
String
getTopicQueueTableKey
()
{
return
topicQueueTableKey
;
}
public
long
[]
getPhyPos
()
{
return
phyPos
;
}
public
void
setPhyPos
(
long
[]
phyPos
)
{
this
.
phyPos
=
phyPos
;
}
public
int
getBatchSize
()
{
return
batchSize
;
}
public
void
setBatchSize
(
int
batchSize
)
{
this
.
batchSize
=
batchSize
;
}
}
}
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
d21266d9
...
...
@@ -34,6 +34,7 @@ import java.util.Set;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore {
@Override
public
PutMessageResult
putMessage
(
MessageExtBrokerInner
msg
)
{
PutMessageStatus
checkStoreStatus
=
this
.
checkStoreStatus
();
if
(
checkStoreStatus
!=
PutMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
checkStoreStatus
,
null
);
}
PutMessageStatus
msgCheckStatus
=
this
.
checkMessage
(
msg
);
if
(
msgCheckStatus
==
PutMessageStatus
.
MESSAGE_ILLEGAL
)
{
return
new
PutMessageResult
(
msgCheckStatus
,
null
);
}
long
beginTime
=
this
.
getSystemClock
().
now
();
PutMessageResult
result
=
this
.
commitLog
.
putMessage
(
msg
);
long
elapsedTime
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
elapsedTime
>
500
)
{
log
.
warn
(
"not in lock elapsed time(ms)={}, bodyLength={}"
,
elapsedTime
,
msg
.
getBody
().
length
);
}
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
elapsedTime
);
if
(
null
==
result
||
!
result
.
isOk
())
{
this
.
storeStatsService
.
getPutMessageFailedTimes
().
incrementAndGet
();
try
{
return
asyncPutMessage
(
msg
).
get
();
}
catch
(
InterruptedException
|
ExecutionException
e
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
null
);
}
return
result
;
}
@Override
public
PutMessageResult
putMessages
(
MessageExtBatch
messageExtBatch
)
{
PutMessageStatus
checkStoreStatus
=
this
.
checkStoreStatus
();
if
(
checkStoreStatus
!=
PutMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
checkStoreStatus
,
null
);
}
PutMessageStatus
msgCheckStatus
=
this
.
checkMessages
(
messageExtBatch
);
if
(
msgCheckStatus
==
PutMessageStatus
.
MESSAGE_ILLEGAL
)
{
return
new
PutMessageResult
(
msgCheckStatus
,
null
);
}
long
beginTime
=
this
.
getSystemClock
().
now
();
PutMessageResult
result
=
this
.
commitLog
.
putMessages
(
messageExtBatch
);
long
elapsedTime
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
elapsedTime
>
500
)
{
log
.
warn
(
"not in lock elapsed time(ms)={}, bodyLength={}"
,
elapsedTime
,
messageExtBatch
.
getBody
().
length
);
}
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
elapsedTime
);
if
(
null
==
result
||
!
result
.
isOk
())
{
this
.
storeStatsService
.
getPutMessageFailedTimes
().
incrementAndGet
();
try
{
return
asyncPutMessages
(
messageExtBatch
).
get
();
}
catch
(
InterruptedException
|
ExecutionException
e
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
null
);
}
return
result
;
}
@Override
...
...
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
浏览文件 @
d21266d9
...
...
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.store.CommitLog.PutMessageContext
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.util.LibC
;
import
sun.nio.ch.DirectBuffer
;
...
...
@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource {
return
fileChannel
;
}
public
AppendMessageResult
appendMessage
(
final
MessageExtBrokerInner
msg
,
final
AppendMessageCallback
cb
)
{
return
appendMessagesInner
(
msg
,
cb
);
public
AppendMessageResult
appendMessage
(
final
MessageExtBrokerInner
msg
,
final
AppendMessageCallback
cb
,
PutMessageContext
putMessageContext
)
{
return
appendMessagesInner
(
msg
,
cb
,
putMessageContext
);
}
public
AppendMessageResult
appendMessages
(
final
MessageExtBatch
messageExtBatch
,
final
AppendMessageCallback
cb
)
{
return
appendMessagesInner
(
messageExtBatch
,
cb
);
public
AppendMessageResult
appendMessages
(
final
MessageExtBatch
messageExtBatch
,
final
AppendMessageCallback
cb
,
PutMessageContext
putMessageContext
)
{
return
appendMessagesInner
(
messageExtBatch
,
cb
,
putMessageContext
);
}
public
AppendMessageResult
appendMessagesInner
(
final
MessageExt
messageExt
,
final
AppendMessageCallback
cb
)
{
public
AppendMessageResult
appendMessagesInner
(
final
MessageExt
messageExt
,
final
AppendMessageCallback
cb
,
PutMessageContext
putMessageContext
)
{
assert
messageExt
!=
null
;
assert
cb
!=
null
;
...
...
@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource {
byteBuffer
.
position
(
currentPos
);
AppendMessageResult
result
;
if
(
messageExt
instanceof
MessageExtBrokerInner
)
{
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBrokerInner
)
messageExt
);
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBrokerInner
)
messageExt
,
putMessageContext
);
}
else
if
(
messageExt
instanceof
MessageExtBatch
)
{
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBatch
)
messageExt
);
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBatch
)
messageExt
,
putMessageContext
);
}
else
{
return
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
);
}
...
...
store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
浏览文件 @
d21266d9
...
...
@@ -16,6 +16,8 @@
*/
package
org.apache.rocketmq.store
;
import
java.nio.ByteBuffer
;
import
org.apache.rocketmq.common.TopicFilterType
;
import
org.apache.rocketmq.common.message.MessageExt
;
...
...
@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt {
private
String
propertiesString
;
private
long
tagsCode
;
private
ByteBuffer
encodedBuff
;
public
ByteBuffer
getEncodedBuff
()
{
return
encodedBuff
;
}
public
void
setEncodedBuff
(
ByteBuffer
encodedBuff
)
{
this
.
encodedBuff
=
encodedBuff
;
}
public
static
long
tagsString2tagsCode
(
final
TopicFilterType
filter
,
final
String
tags
)
{
if
(
null
==
tags
||
tags
.
length
()
==
0
)
{
return
0
;
}
...
...
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
d21266d9
...
...
@@ -37,7 +37,6 @@ import java.util.HashMap;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageConst
;
...
...
@@ -413,237 +412,6 @@ public class DLedgerCommitLog extends CommitLog {
}
}
@Override
public
PutMessageResult
putMessage
(
final
MessageExtBrokerInner
msg
)
{
StoreStatsService
storeStatsService
=
this
.
defaultMessageStore
.
getStoreStatsService
();
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
msg
.
getSysFlag
());
String
topic
=
msg
.
getTopic
();
setMessageInfo
(
msg
,
tranType
);
// Back to Results
AppendMessageResult
appendResult
;
AppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
msg
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
long
elapsedTimeInLock
;
long
queueOffset
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
false
);
AppendEntryRequest
request
=
new
AppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
request
.
setBody
(
encodeResult
.
getData
());
dledgerFuture
=
(
AppendFuture
<
AppendEntryResponse
>)
dLedgerServer
.
handleAppend
(
request
);
if
(
dledgerFuture
.
getPos
()
==
-
1
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
OS_PAGECACHE_BUSY
,
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
));
}
long
wroteOffset
=
dledgerFuture
.
getPos
()
+
DLedgerEntry
.
BODY_OFFSET
;
int
msgIdLength
=
(
msg
.
getSysFlag
()
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
?
4
+
4
+
8
:
16
+
4
+
8
;
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
msgIdLength
);
String
msgId
=
MessageDecoder
.
createMessageId
(
buffer
,
msg
.
getStoreHostBytes
(),
wroteOffset
);
elapsedTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginTimeInDledgerLock
;
appendResult
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
encodeResult
.
getData
().
length
,
msgId
,
System
.
currentTimeMillis
(),
queueOffset
,
elapsedTimeInLock
);
switch
(
tranType
)
{
case
MessageSysFlag
.
TRANSACTION_PREPARED_TYPE
:
case
MessageSysFlag
.
TRANSACTION_ROLLBACK_TYPE
:
break
;
case
MessageSysFlag
.
TRANSACTION_NOT_TYPE
:
case
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
:
// The next update ConsumeQueue information
DLedgerCommitLog
.
this
.
topicQueueTable
.
put
(
encodeResult
.
queueOffsetKey
,
queueOffset
+
1
);
break
;
default
:
break
;
}
}
catch
(
Exception
e
)
{
log
.
error
(
"Put message error"
,
e
);
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
));
}
finally
{
beginTimeInDledgerLock
=
0
;
putMessageLock
.
unlock
();
}
if
(
elapsedTimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
elapsedTimeInLock
,
msg
.
getBody
().
length
,
appendResult
);
}
PutMessageStatus
putMessageStatus
=
PutMessageStatus
.
UNKNOWN_ERROR
;
try
{
AppendEntryResponse
appendEntryResponse
=
dledgerFuture
.
get
(
3
,
TimeUnit
.
SECONDS
);
switch
(
DLedgerResponseCode
.
valueOf
(
appendEntryResponse
.
getCode
()))
{
case
SUCCESS:
putMessageStatus
=
PutMessageStatus
.
PUT_OK
;
break
;
case
INCONSISTENT_LEADER:
case
NOT_LEADER:
case
LEADER_NOT_READY:
case
DISK_FULL:
putMessageStatus
=
PutMessageStatus
.
SERVICE_NOT_AVAILABLE
;
break
;
case
WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus
=
PutMessageStatus
.
OS_PAGECACHE_BUSY
;
break
;
case
LEADER_PENDING_FULL:
putMessageStatus
=
PutMessageStatus
.
OS_PAGECACHE_BUSY
;
break
;
}
}
catch
(
Throwable
t
)
{
log
.
error
(
"Failed to get dledger append result"
,
t
);
}
PutMessageResult
putMessageResult
=
new
PutMessageResult
(
putMessageStatus
,
appendResult
);
if
(
putMessageStatus
==
PutMessageStatus
.
PUT_OK
)
{
// Statistics
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
msg
.
getTopic
()).
incrementAndGet
();
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
topic
).
addAndGet
(
appendResult
.
getWroteBytes
());
}
return
putMessageResult
;
}
@Override
public
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
)
{
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
messageExtBatch
.
getSysFlag
());
if
(
tranType
!=
MessageSysFlag
.
TRANSACTION_NOT_TYPE
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
if
(
messageExtBatch
.
getDelayTimeLevel
()
>
0
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
// Set the storage time
messageExtBatch
.
setStoreTimestamp
(
System
.
currentTimeMillis
());
StoreStatsService
storeStatsService
=
this
.
defaultMessageStore
.
getStoreStatsService
();
InetSocketAddress
bornSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getBornHost
();
if
(
bornSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setBornHostV6Flag
();
}
InetSocketAddress
storeSocketAddress
=
(
InetSocketAddress
)
messageExtBatch
.
getStoreHost
();
if
(
storeSocketAddress
.
getAddress
()
instanceof
Inet6Address
)
{
messageExtBatch
.
setStoreHostAddressV6Flag
();
}
// Back to Results
AppendMessageResult
appendResult
;
BatchAppendFuture
<
AppendEntryResponse
>
dledgerFuture
;
EncodeResult
encodeResult
;
encodeResult
=
this
.
messageSerializer
.
serialize
(
messageExtBatch
);
if
(
encodeResult
.
status
!=
AppendMessageStatus
.
PUT_OK
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
new
AppendMessageResult
(
encodeResult
.
status
));
}
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
msgIdBuilder
.
setLength
(
0
);
long
elapsedTimeInLock
;
long
queueOffset
;
int
msgNum
=
0
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
queueOffset
=
getQueueOffsetByKey
(
encodeResult
.
queueOffsetKey
,
tranType
);
encodeResult
.
setQueueOffsetKey
(
queueOffset
,
true
);
BatchAppendEntryRequest
request
=
new
BatchAppendEntryRequest
();
request
.
setGroup
(
dLedgerConfig
.
getGroup
());
request
.
setRemoteId
(
dLedgerServer
.
getMemberState
().
getSelfId
());
request
.
setBatchMsgs
(
encodeResult
.
batchData
);
AppendFuture
<
AppendEntryResponse
>
appendFuture
=
(
AppendFuture
<
AppendEntryResponse
>)
dLedgerServer
.
handleAppend
(
request
);
if
(
appendFuture
.
getPos
()
==
-
1
)
{
log
.
warn
(
"HandleAppend return false due to error code {}"
,
appendFuture
.
get
().
getCode
());
return
new
PutMessageResult
(
PutMessageStatus
.
OS_PAGECACHE_BUSY
,
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
));
}
dledgerFuture
=
(
BatchAppendFuture
<
AppendEntryResponse
>)
appendFuture
;
long
wroteOffset
=
0
;
int
msgIdLength
=
(
messageExtBatch
.
getSysFlag
()
&
MessageSysFlag
.
STOREHOSTADDRESS_V6_FLAG
)
==
0
?
4
+
4
+
8
:
16
+
4
+
8
;
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
msgIdLength
);
boolean
isFirstOffset
=
true
;
long
firstWroteOffset
=
0
;
for
(
long
pos
:
dledgerFuture
.
getPositions
())
{
wroteOffset
=
pos
+
DLedgerEntry
.
BODY_OFFSET
;
if
(
isFirstOffset
)
{
firstWroteOffset
=
wroteOffset
;
isFirstOffset
=
false
;
}
String
msgId
=
MessageDecoder
.
createMessageId
(
buffer
,
messageExtBatch
.
getStoreHostBytes
(),
wroteOffset
);
if
(
msgIdBuilder
.
length
()
>
0
)
{
msgIdBuilder
.
append
(
','
).
append
(
msgId
);
}
else
{
msgIdBuilder
.
append
(
msgId
);
}
msgNum
++;
}
elapsedTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginTimeInDledgerLock
;
appendResult
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
firstWroteOffset
,
encodeResult
.
totalMsgLen
,
msgIdBuilder
.
toString
(),
System
.
currentTimeMillis
(),
queueOffset
,
elapsedTimeInLock
);
appendResult
.
setMsgNum
(
msgNum
);
DLedgerCommitLog
.
this
.
topicQueueTable
.
put
(
encodeResult
.
queueOffsetKey
,
queueOffset
+
msgNum
);
}
catch
(
Exception
e
)
{
log
.
error
(
"Put message error"
,
e
);
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
));
}
finally
{
beginTimeInDledgerLock
=
0
;
putMessageLock
.
unlock
();
}
if
(
elapsedTimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
elapsedTimeInLock
,
messageExtBatch
.
getBody
().
length
,
appendResult
);
}
PutMessageStatus
putMessageStatus
=
PutMessageStatus
.
UNKNOWN_ERROR
;
try
{
AppendEntryResponse
appendEntryResponse
=
dledgerFuture
.
get
(
3
,
TimeUnit
.
SECONDS
);
switch
(
DLedgerResponseCode
.
valueOf
(
appendEntryResponse
.
getCode
()))
{
case
SUCCESS:
putMessageStatus
=
PutMessageStatus
.
PUT_OK
;
break
;
case
INCONSISTENT_LEADER:
case
NOT_LEADER:
case
LEADER_NOT_READY:
case
DISK_FULL:
putMessageStatus
=
PutMessageStatus
.
SERVICE_NOT_AVAILABLE
;
break
;
case
WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus
=
PutMessageStatus
.
OS_PAGECACHE_BUSY
;
break
;
case
LEADER_PENDING_FULL:
putMessageStatus
=
PutMessageStatus
.
OS_PAGECACHE_BUSY
;
break
;
}
}
catch
(
Throwable
t
)
{
log
.
error
(
"Failed to get dledger append result"
,
t
);
}
PutMessageResult
putMessageResult
=
new
PutMessageResult
(
putMessageStatus
,
appendResult
);
if
(
putMessageStatus
==
PutMessageStatus
.
PUT_OK
)
{
// Statistics
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
msgNum
);
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
encodeResult
.
totalMsgLen
);
}
return
putMessageResult
;
}
@Override
public
CompletableFuture
<
PutMessageResult
>
asyncPutMessage
(
MessageExtBrokerInner
msg
)
{
...
...
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
浏览文件 @
d21266d9
...
...
@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.store.CommitLog.MessageExtEncoder
;
import
org.apache.rocketmq.store.CommitLog.PutMessageContext
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.After
;
import
org.junit.Before
;
...
...
@@ -42,7 +44,7 @@ public class AppendCallbackTest {
AppendMessageCallback
callback
;
CommitLog
.
MessageExtBatchEncoder
batchEncoder
=
new
CommitLog
.
MessageExtBatch
Encoder
(
10
*
1024
*
1024
);
MessageExtEncoder
batchEncoder
=
new
MessageExt
Encoder
(
10
*
1024
*
1024
);
@Before
public
void
init
()
throws
Exception
{
...
...
@@ -84,10 +86,12 @@ public class AppendCallbackTest {
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
topic
+
"-"
+
queue
);
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
,
putMessageContext
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
//encounter end of file when append half of the data
AppendMessageResult
result
=
callback
.
doAppend
(
0
,
buff
,
1000
,
messageExtBatch
);
AppendMessageResult
result
=
callback
.
doAppend
(
0
,
buff
,
1000
,
messageExtBatch
,
putMessageContext
);
assertEquals
(
AppendMessageStatus
.
END_OF_FILE
,
result
.
getStatus
());
assertEquals
(
0
,
result
.
getWroteOffset
());
assertEquals
(
0
,
result
.
getLogicsOffset
());
...
...
@@ -121,10 +125,12 @@ public class AppendCallbackTest {
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"::1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
topic
+
"-"
+
queue
);
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
,
putMessageContext
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
//encounter end of file when append half of the data
AppendMessageResult
result
=
callback
.
doAppend
(
0
,
buff
,
1000
,
messageExtBatch
);
AppendMessageResult
result
=
callback
.
doAppend
(
0
,
buff
,
1000
,
messageExtBatch
,
putMessageContext
);
assertEquals
(
AppendMessageStatus
.
END_OF_FILE
,
result
.
getStatus
());
assertEquals
(
0
,
result
.
getWroteOffset
());
assertEquals
(
0
,
result
.
getLogicsOffset
());
...
...
@@ -154,9 +160,11 @@ public class AppendCallbackTest {
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
topic
+
"-"
+
queue
);
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
,
putMessageContext
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
AppendMessageResult
allresult
=
callback
.
doAppend
(
0
,
buff
,
1024
*
10
,
messageExtBatch
);
AppendMessageResult
allresult
=
callback
.
doAppend
(
0
,
buff
,
1024
*
10
,
messageExtBatch
,
putMessageContext
);
assertEquals
(
AppendMessageStatus
.
PUT_OK
,
allresult
.
getStatus
());
assertEquals
(
0
,
allresult
.
getWroteOffset
());
...
...
@@ -214,9 +222,11 @@ public class AppendCallbackTest {
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"::1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
PutMessageContext
putMessageContext
=
new
PutMessageContext
(
topic
+
"-"
+
queue
);
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
,
putMessageContext
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
AppendMessageResult
allresult
=
callback
.
doAppend
(
0
,
buff
,
1024
*
10
,
messageExtBatch
);
AppendMessageResult
allresult
=
callback
.
doAppend
(
0
,
buff
,
1024
*
10
,
messageExtBatch
,
putMessageContext
);
assertEquals
(
AppendMessageStatus
.
PUT_OK
,
allresult
.
getStatus
());
assertEquals
(
0
,
allresult
.
getWroteOffset
());
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
浏览文件 @
d21266d9
...
...
@@ -164,9 +164,9 @@ public class UpdateAccessConfigSubCommand implements SubCommand {
String
clusterName
=
commandLine
.
getOptionValue
(
'c'
).
trim
();
defaultMQAdminExt
.
start
();
Set
<
String
>
maste
rSet
=
CommandUtil
.
fetchMasterAddrByClusterName
(
defaultMQAdminExt
,
clusterName
);
for
(
String
addr
:
maste
rSet
)
{
Set
<
String
>
brokerAdd
rSet
=
CommandUtil
.
fetchMasterA
ndSlaveA
ddrByClusterName
(
defaultMQAdminExt
,
clusterName
);
for
(
String
addr
:
brokerAdd
rSet
)
{
defaultMQAdminExt
.
createAndUpdatePlainAccessConfig
(
addr
,
accessConfig
);
System
.
out
.
printf
(
"create or update plain access config to %s success.%n"
,
addr
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录