Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
47fad3c1
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
47fad3c1
编写于
3月 17, 2017
作者:
D
dongeforever
提交者:
dongeforever
6月 06, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
上级
0d6c56b1
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
1464 addition
and
206 deletion
+1464
-206
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+2
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
...cketmq/broker/processor/AbstractSendMessageProcessor.java
+7
-5
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
...pache/rocketmq/broker/processor/SendMessageProcessor.java
+222
-128
client/src/main/java/org/apache/rocketmq/client/Validators.java
.../src/main/java/org/apache/rocketmq/client/Validators.java
+1
-0
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+33
-23
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+19
-11
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
...rg/apache/rocketmq/client/producer/DefaultMQProducer.java
+38
-0
client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
.../java/org/apache/rocketmq/client/producer/MQProducer.java
+14
-0
common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
...main/java/org/apache/rocketmq/common/TopicFilterType.java
+1
-0
common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
...java/org/apache/rocketmq/common/message/MessageBatch.java
+73
-0
common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
...va/org/apache/rocketmq/common/message/MessageDecoder.java
+103
-0
common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
...n/java/org/apache/rocketmq/common/message/MessageExt.java
+1
-1
common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
...a/org/apache/rocketmq/common/message/MessageExtBatch.java
+42
-0
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
...java/org/apache/rocketmq/common/protocol/RequestCode.java
+3
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
...etmq/common/protocol/header/SendMessageRequestHeader.java
+10
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
...mq/common/protocol/header/SendMessageRequestHeaderV2.java
+14
-0
common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
...est/java/org/apache/rocketmq/common/MessageBatchTest.java
+70
-0
common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
...a/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
+81
-0
store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
...java/org/apache/rocketmq/store/AppendMessageCallback.java
+14
-1
store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
...n/java/org/apache/rocketmq/store/AppendMessageResult.java
+11
-0
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+323
-21
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
...src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+57
-0
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
...e/src/main/java/org/apache/rocketmq/store/MappedFile.java
+20
-12
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
...src/main/java/org/apache/rocketmq/store/MessageStore.java
+3
-0
store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
...src/main/java/org/apache/rocketmq/store/RunningFlags.java
+11
-0
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
.../org/apache/rocketmq/store/config/MessageStoreConfig.java
+2
-0
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
...a/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+6
-2
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
...st/java/org/apache/rocketmq/store/AppendCallbackTest.java
+150
-0
test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
...ache/rocketmq/test/client/producer/batch/BatchSendIT.java
+131
-0
test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
...est/client/producer/exception/msg/MessageExceptionIT.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
47fad3c1
...
...
@@ -374,9 +374,11 @@ public class BrokerController {
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
sendProcessor
,
this
.
sendMessageExecutor
);
/**
* PullMessageProcessor
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
浏览文件 @
47fad3c1
...
...
@@ -17,11 +17,6 @@
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageContext
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageHook
;
...
...
@@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
public
abstract
class
AbstractSendMessageProcessor
implements
NettyRequestProcessor
{
protected
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
...
...
@@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
SendMessageRequestHeaderV2
requestHeaderV2
=
null
;
SendMessageRequestHeader
requestHeader
=
null
;
switch
(
request
.
getCode
())
{
case
RequestCode
.
SEND_BATCH_MESSAGE
:
case
RequestCode
.
SEND_MESSAGE_V2
:
requestHeaderV2
=
(
SendMessageRequestHeaderV2
)
request
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
浏览文件 @
47fad3c1
...
...
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader
;
...
...
@@ -72,7 +73,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
mqtraceContext
=
buildMsgContext
(
ctx
,
requestHeader
);
this
.
executeSendMessageHookBefore
(
ctx
,
request
,
mqtraceContext
);
final
RemotingCommand
response
=
this
.
sendMessage
(
ctx
,
request
,
mqtraceContext
,
requestHeader
);
RemotingCommand
response
;
if
(
requestHeader
.
isBatch
())
{
response
=
this
.
sendBatchMessage
(
ctx
,
request
,
mqtraceContext
,
requestHeader
);
}
else
{
response
=
this
.
sendMessage
(
ctx
,
request
,
mqtraceContext
,
requestHeader
);
}
this
.
executeSendMessageHookAfter
(
response
,
mqtraceContext
);
return
response
;
...
...
@@ -238,6 +245,50 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return
response
;
}
private
boolean
handleRetryAndDLQ
(
SendMessageRequestHeader
requestHeader
,
RemotingCommand
response
,
RemotingCommand
request
,
MessageExt
msg
,
TopicConfig
topicConfig
)
{
String
newTopic
=
requestHeader
.
getTopic
();
if
(
null
!=
newTopic
&&
newTopic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
String
groupName
=
newTopic
.
substring
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
.
length
());
SubscriptionGroupConfig
subscriptionGroupConfig
=
this
.
brokerController
.
getSubscriptionGroupManager
().
findSubscriptionGroupConfig
(
groupName
);
if
(
null
==
subscriptionGroupConfig
)
{
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_GROUP_NOT_EXIST
);
response
.
setRemark
(
"subscription group not exist, "
+
groupName
+
" "
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
SUBSCRIPTION_GROUP_NOT_EXIST
));
return
false
;
}
int
maxReconsumeTimes
=
subscriptionGroupConfig
.
getRetryMaxTimes
();
if
(
request
.
getVersion
()
>=
MQVersion
.
Version
.
V3_4_9
.
ordinal
())
{
maxReconsumeTimes
=
requestHeader
.
getMaxReconsumeTimes
();
}
int
reconsumeTimes
=
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
();
if
(
reconsumeTimes
>=
maxReconsumeTimes
)
{
newTopic
=
MixAll
.
getDLQTopic
(
groupName
);
int
queueIdInt
=
Math
.
abs
(
this
.
random
.
nextInt
()
%
99999999
)
%
DLQ_NUMS_PER_GROUP
;
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
createTopicInSendMessageBackMethod
(
newTopic
,
//
DLQ_NUMS_PER_GROUP
,
//
PermName
.
PERM_WRITE
,
0
);
msg
.
setTopic
(
newTopic
);
msg
.
setQueueId
(
queueIdInt
);
if
(
null
==
topicConfig
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"topic["
+
newTopic
+
"] not exist"
);
return
false
;
}
}
}
int
sysFlag
=
requestHeader
.
getSysFlag
();
if
(
TopicFilterType
.
MULTI_TAG
==
topicConfig
.
getTopicFilterType
())
{
sysFlag
|=
MessageSysFlag
.
MULTI_TAGS_FLAG
;
}
msg
.
setSysFlag
(
sysFlag
);
return
true
;
}
private
RemotingCommand
sendMessage
(
final
ChannelHandlerContext
ctx
,
//
final
RemotingCommand
request
,
//
final
SendMessageContext
sendMessageContext
,
//
...
...
@@ -251,9 +302,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response
.
addExtField
(
MessageConst
.
PROPERTY_MSG_REGION
,
this
.
brokerController
.
getBrokerConfig
().
getRegionId
());
response
.
addExtField
(
MessageConst
.
PROPERTY_TRACE_SWITCH
,
String
.
valueOf
(
this
.
brokerController
.
getBrokerConfig
().
isTraceOn
()));
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"receive SendMessage request command, {}"
,
request
);
}
log
.
debug
(
"receive SendMessage request command, {}"
,
request
);
final
long
startTimstamp
=
this
.
brokerController
.
getBrokerConfig
().
getStartAcceptSendRequestTimeStamp
();
if
(
this
.
brokerController
.
getMessageStore
().
now
()
<
startTimstamp
)
{
...
...
@@ -270,6 +319,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final
byte
[]
body
=
request
.
getBody
();
int
queueIdInt
=
requestHeader
.
getQueueId
();
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
...
...
@@ -277,53 +328,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
queueIdInt
=
Math
.
abs
(
this
.
random
.
nextInt
()
%
99999999
)
%
topicConfig
.
getWriteQueueNums
();
}
int
sysFlag
=
requestHeader
.
getSysFlag
();
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
msgInner
.
setTopic
(
requestHeader
.
getTopic
());
msgInner
.
setQueueId
(
queueIdInt
);
if
(
TopicFilterType
.
MULTI_TAG
==
topicConfig
.
getTopicFilterType
(
))
{
sysFlag
|=
MessageSysFlag
.
MULTI_TAGS_FLAG
;
if
(
!
handleRetryAndDLQ
(
requestHeader
,
response
,
request
,
msgInner
,
topicConfig
))
{
return
response
;
}
String
newTopic
=
requestHeader
.
getTopic
();
if
(
null
!=
newTopic
&&
newTopic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
String
groupName
=
newTopic
.
substring
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
.
length
());
SubscriptionGroupConfig
subscriptionGroupConfig
=
this
.
brokerController
.
getSubscriptionGroupManager
().
findSubscriptionGroupConfig
(
groupName
);
if
(
null
==
subscriptionGroupConfig
)
{
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_GROUP_NOT_EXIST
);
response
.
setRemark
(
"subscription group not exist, "
+
groupName
+
" "
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
SUBSCRIPTION_GROUP_NOT_EXIST
));
return
response
;
}
int
maxReconsumeTimes
=
subscriptionGroupConfig
.
getRetryMaxTimes
();
if
(
request
.
getVersion
()
>=
MQVersion
.
Version
.
V3_4_9
.
ordinal
())
{
maxReconsumeTimes
=
requestHeader
.
getMaxReconsumeTimes
();
}
int
reconsumeTimes
=
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
();
if
(
reconsumeTimes
>=
maxReconsumeTimes
)
{
newTopic
=
MixAll
.
getDLQTopic
(
groupName
);
queueIdInt
=
Math
.
abs
(
this
.
random
.
nextInt
()
%
99999999
)
%
DLQ_NUMS_PER_GROUP
;
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
createTopicInSendMessageBackMethod
(
newTopic
,
//
DLQ_NUMS_PER_GROUP
,
//
PermName
.
PERM_WRITE
,
0
);
if
(
null
==
topicConfig
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"topic["
+
newTopic
+
"] not exist"
);
return
response
;
}
}
}
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
msgInner
.
setTopic
(
newTopic
);
msgInner
.
setBody
(
body
);
msgInner
.
setFlag
(
requestHeader
.
getFlag
());
MessageAccessor
.
setProperties
(
msgInner
,
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
()));
msgInner
.
setPropertiesString
(
requestHeader
.
getProperties
());
msgInner
.
setTagsCode
(
MessageExtBrokerInner
.
tagsString2tagsCode
(
topicConfig
.
getTopicFilterType
(),
msgInner
.
getTags
()));
msgInner
.
setQueueId
(
queueIdInt
);
msgInner
.
setSysFlag
(
sysFlag
);
msgInner
.
setBornTimestamp
(
requestHeader
.
getBornTimestamp
());
msgInner
.
setBornHost
(
ctx
.
channel
().
remoteAddress
());
msgInner
.
setStoreHost
(
this
.
getStoreHost
());
...
...
@@ -340,105 +356,183 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
if
(
putMessageResult
!=
null
)
{
boolean
sendOK
=
false
;
switch
(
putMessageResult
.
getPutMessageStatus
())
{
// Success
case
PUT_OK:
sendOK
=
true
;
response
.
setCode
(
ResponseCode
.
SUCCESS
);
break
;
case
FLUSH_DISK_TIMEOUT:
response
.
setCode
(
ResponseCode
.
FLUSH_DISK_TIMEOUT
);
sendOK
=
true
;
break
;
case
FLUSH_SLAVE_TIMEOUT:
response
.
setCode
(
ResponseCode
.
FLUSH_SLAVE_TIMEOUT
);
sendOK
=
true
;
break
;
case
SLAVE_NOT_AVAILABLE:
response
.
setCode
(
ResponseCode
.
SLAVE_NOT_AVAILABLE
);
sendOK
=
true
;
break
;
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
msgInner
,
responseHeader
,
sendMessageContext
,
ctx
,
queueIdInt
);
// Failed
case
CREATE_MAPEDFILE_FAILED:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"create mapped file failed, server is busy or broken."
);
break
;
case
MESSAGE_ILLEGAL:
case
PROPERTIES_SIZE_EXCEEDED:
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."
);
break
;
case
SERVICE_NOT_AVAILABLE:
response
.
setCode
(
ResponseCode
.
SERVICE_NOT_AVAILABLE
);
response
.
setRemark
(
"service not available now, maybe disk full, "
+
diskUtil
()
+
", maybe your broker machine memory too small."
);
break
;
case
OS_PAGECACHE_BUSY:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"[PC_SYNCHRONIZED]broker busy, start flow control for a while"
);
break
;
case
UNKNOWN_ERROR:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"UNKNOWN_ERROR"
);
break
;
default
:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"UNKNOWN_ERROR DEFAULT"
);
break
;
}
}
String
owner
=
request
.
getExtFields
().
get
(
BrokerStatsManager
.
COMMERCIAL_OWNER
);
if
(
sendOK
)
{
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutNums
(
msgInner
.
getTopic
());
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutSize
(
msgInner
.
getTopic
(),
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
());
this
.
brokerController
.
getBrokerStatsManager
().
incBrokerPutNums
();
private
RemotingCommand
handlePutMessageResult
(
PutMessageResult
putMessageResult
,
RemotingCommand
response
,
RemotingCommand
request
,
MessageExt
msg
,
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
ChannelHandlerContext
ctx
,
int
queueIdInt
)
{
if
(
putMessageResult
==
null
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"store putMessage return null"
);
return
response
;
}
boolean
sendOK
=
false
;
switch
(
putMessageResult
.
getPutMessageStatus
())
{
// Success
case
PUT_OK:
sendOK
=
true
;
response
.
setCode
(
ResponseCode
.
SUCCESS
);
break
;
case
FLUSH_DISK_TIMEOUT:
response
.
setCode
(
ResponseCode
.
FLUSH_DISK_TIMEOUT
);
sendOK
=
true
;
break
;
case
FLUSH_SLAVE_TIMEOUT:
response
.
setCode
(
ResponseCode
.
FLUSH_SLAVE_TIMEOUT
);
sendOK
=
true
;
break
;
case
SLAVE_NOT_AVAILABLE:
response
.
setCode
(
ResponseCode
.
SLAVE_NOT_AVAILABLE
);
sendOK
=
true
;
break
;
// Failed
case
CREATE_MAPEDFILE_FAILED:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"create mapped file failed, server is busy or broken."
);
break
;
case
MESSAGE_ILLEGAL:
case
PROPERTIES_SIZE_EXCEEDED:
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."
);
break
;
case
SERVICE_NOT_AVAILABLE:
response
.
setCode
(
ResponseCode
.
SERVICE_NOT_AVAILABLE
);
response
.
setRemark
(
"service not available now, maybe disk full, "
+
diskUtil
()
+
", maybe your broker machine memory too small."
);
break
;
case
OS_PAGECACHE_BUSY:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"[PC_SYNCHRONIZED]broker busy, start flow control for a while"
);
break
;
case
UNKNOWN_ERROR:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"UNKNOWN_ERROR"
);
break
;
default
:
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"UNKNOWN_ERROR DEFAULT"
);
break
;
}
String
owner
=
request
.
getExtFields
().
get
(
BrokerStatsManager
.
COMMERCIAL_OWNER
);
if
(
sendOK
)
{
response
.
setRemark
(
null
);
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutNums
(
msg
.
getTopic
(),
putMessageResult
.
getAppendMessageResult
().
getMsgNum
(),
1
);
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutSize
(
msg
.
getTopic
(),
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
());
this
.
brokerController
.
getBrokerStatsManager
().
incBrokerPutNums
(
putMessageResult
.
getAppendMessageResult
().
getMsgNum
());
responseHeader
.
setMsgId
(
putMessageResult
.
getAppendMessageResult
().
getMsgId
());
responseHeader
.
setQueueId
(
queueIdInt
);
responseHeader
.
setQueueOffset
(
putMessageResult
.
getAppendMessageResult
().
getLogicsOffset
());
response
.
setRemark
(
null
);
doResponse
(
ctx
,
request
,
response
);
responseHeader
.
setMsgId
(
putMessageResult
.
getAppendMessageResult
().
getMsgId
());
responseHeader
.
setQueueId
(
queueIdInt
);
responseHeader
.
setQueueOffset
(
putMessageResult
.
getAppendMessageResult
().
getLogicsOffset
());
if
(
hasSendMessageHook
())
{
sendMessageContext
.
setMsgId
(
responseHeader
.
getMsgId
());
sendMessageContext
.
setQueueId
(
responseHeader
.
getQueueId
());
sendMessageContext
.
setQueueOffset
(
responseHeader
.
getQueueOffset
());
doResponse
(
ctx
,
request
,
response
);
int
commercialBaseCount
=
brokerController
.
getBrokerConfig
().
getCommercialBaseCount
();
int
wroteSize
=
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
();
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
)
*
commercialBaseCount
;
if
(
hasSendMessageHook
())
{
sendMessageContext
.
setMsgId
(
responseHeader
.
getMsgId
());
sendMessageContext
.
setQueueId
(
responseHeader
.
getQueueId
());
sendMessageContext
.
setQueueOffset
(
responseHeader
.
getQueueOffset
());
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_SUCCESS
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
sendMessageContext
.
setCommercialSendSize
(
wroteSize
);
sendMessageContext
.
setCommercialOwner
(
owner
);
}
return
null
;
}
else
{
if
(
hasSendMessageHook
())
{
int
wroteSize
=
request
.
getBody
().
length
;
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
);
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_FAILURE
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
sendMessageContext
.
setCommercialSendSize
(
wroteSize
);
sendMessageContext
.
setCommercialOwner
(
owner
);
}
int
commercialBaseCount
=
brokerController
.
getBrokerConfig
().
getCommercialBaseCount
();
int
wroteSize
=
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
();
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
)
*
commercialBaseCount
;
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_SUCCESS
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
sendMessageContext
.
setCommercialSendSize
(
wroteSize
);
sendMessageContext
.
setCommercialOwner
(
owner
);
}
return
null
;
}
else
{
if
(
hasSendMessageHook
())
{
int
wroteSize
=
request
.
getBody
().
length
;
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
);
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_FAILURE
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
sendMessageContext
.
setCommercialSendSize
(
wroteSize
);
sendMessageContext
.
setCommercialOwner
(
owner
);
}
}
return
response
;
}
private
RemotingCommand
sendBatchMessage
(
final
ChannelHandlerContext
ctx
,
//
final
RemotingCommand
request
,
//
final
SendMessageContext
sendMessageContext
,
//
final
SendMessageRequestHeader
requestHeader
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
SendMessageResponseHeader
.
class
);
final
SendMessageResponseHeader
responseHeader
=
(
SendMessageResponseHeader
)
response
.
readCustomHeader
();
response
.
setOpaque
(
request
.
getOpaque
());
response
.
addExtField
(
MessageConst
.
PROPERTY_MSG_REGION
,
this
.
brokerController
.
getBrokerConfig
().
getRegionId
());
response
.
addExtField
(
MessageConst
.
PROPERTY_TRACE_SWITCH
,
String
.
valueOf
(
this
.
brokerController
.
getBrokerConfig
().
isTraceOn
()));
log
.
debug
(
"Receive SendMessage request command {}"
,
request
);
final
long
startTimstamp
=
this
.
brokerController
.
getBrokerConfig
().
getStartAcceptSendRequestTimeStamp
();
if
(
this
.
brokerController
.
getMessageStore
().
now
()
<
startTimstamp
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"store putMessage return null"
);
response
.
setRemark
(
String
.
format
(
"broker unable to service, until %s"
,
UtilAll
.
timeMillisToHumanString2
(
startTimstamp
)));
return
response
;
}
response
.
setCode
(-
1
);
super
.
msgCheck
(
ctx
,
requestHeader
,
response
);
if
(
response
.
getCode
()
!=
-
1
)
{
return
response
;
}
int
queueIdInt
=
requestHeader
.
getQueueId
();
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
if
(
queueIdInt
<
0
)
{
queueIdInt
=
Math
.
abs
(
this
.
random
.
nextInt
()
%
99999999
)
%
topicConfig
.
getWriteQueueNums
();
}
if
(
requestHeader
.
getTopic
().
length
()
>
Byte
.
MAX_VALUE
)
{
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
"message topic length too long "
+
requestHeader
.
getTopic
().
length
());
return
response
;
}
if
(
requestHeader
.
getTopic
()
!=
null
&&
requestHeader
.
getTopic
().
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
"batch request does not support retry group "
+
requestHeader
.
getTopic
());
return
response
;
}
MessageExtBatch
messageExtBatch
=
new
MessageExtBatch
();
messageExtBatch
.
setTopic
(
requestHeader
.
getTopic
());
messageExtBatch
.
setQueueId
(
queueIdInt
);
int
sysFlag
=
requestHeader
.
getSysFlag
();
if
(
TopicFilterType
.
MULTI_TAG
==
topicConfig
.
getTopicFilterType
())
{
sysFlag
|=
MessageSysFlag
.
MULTI_TAGS_FLAG
;
}
messageExtBatch
.
setSysFlag
(
sysFlag
);
messageExtBatch
.
setFlag
(
requestHeader
.
getFlag
());
MessageAccessor
.
setProperties
(
messageExtBatch
,
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
()));
messageExtBatch
.
setBody
(
request
.
getBody
());
messageExtBatch
.
setBornTimestamp
(
requestHeader
.
getBornTimestamp
());
messageExtBatch
.
setBornHost
(
ctx
.
channel
().
remoteAddress
());
messageExtBatch
.
setStoreHost
(
this
.
getStoreHost
());
messageExtBatch
.
setReconsumeTimes
(
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
());
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessages
(
messageExtBatch
);
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
messageExtBatch
,
responseHeader
,
sendMessageContext
,
ctx
,
queueIdInt
);
return
response
;
}
...
...
client/src/main/java/org/apache/rocketmq/client/Validators.java
浏览文件 @
47fad3c1
...
...
@@ -95,6 +95,7 @@ public class Validators {
}
// topic
Validators
.
checkTopic
(
msg
.
getTopic
());
// body
if
(
null
==
msg
.
getBody
())
{
throw
new
MQClientException
(
ResponseCode
.
MESSAGE_ILLEGAL
,
"the message body is null"
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
47fad3c1
...
...
@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
import
java.io.UnsupportedEncodingException
;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.Iterator
;
import
java.util.Collections
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.consumer.PullCallback
;
...
...
@@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageClientIDSetter
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageBatch
;
import
org.apache.rocketmq.common.namesrv.TopAddressing
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
...
...
@@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
org.slf4j.Logger
;
public
class
MQClientAPIImpl
{
private
final
static
Logger
log
=
ClientLogger
.
getLog
();
...
...
@@ -278,14 +280,14 @@ public class MQClientAPIImpl {
}
public
SendResult
sendMessage
(
//
final
String
addr
,
// 1
final
String
brokerName
,
// 2
final
Message
msg
,
// 3
final
SendMessageRequestHeader
requestHeader
,
// 4
final
long
timeoutMillis
,
// 5
final
CommunicationMode
communicationMode
,
// 6
final
SendMessageContext
context
,
// 7
final
DefaultMQProducerImpl
producer
// 8
final
String
addr
,
// 1
final
String
brokerName
,
// 2
final
Message
msg
,
// 3
final
SendMessageRequestHeader
requestHeader
,
// 4
final
long
timeoutMillis
,
// 5
final
CommunicationMode
communicationMode
,
// 6
final
SendMessageContext
context
,
// 7
final
DefaultMQProducerImpl
producer
// 8
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
sendMessage
(
addr
,
brokerName
,
msg
,
requestHeader
,
timeoutMillis
,
communicationMode
,
null
,
null
,
null
,
0
,
context
,
producer
);
}
...
...
@@ -305,9 +307,9 @@ public class MQClientAPIImpl {
final
DefaultMQProducerImpl
producer
// 12
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
{
RemotingCommand
request
=
null
;
if
(
sendSmartMsg
)
{
if
(
sendSmartMsg
||
msg
instanceof
MessageBatch
)
{
SendMessageRequestHeaderV2
requestHeaderV2
=
SendMessageRequestHeaderV2
.
createSendMessageRequestHeaderV2
(
requestHeader
);
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_MESSAGE_V2
,
requestHeaderV2
);
request
=
RemotingCommand
.
createRequestCommand
(
msg
instanceof
MessageBatch
?
RequestCode
.
SEND_BATCH_MESSAGE
:
RequestCode
.
SEND_MESSAGE_V2
,
requestHeaderV2
);
}
else
{
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_MESSAGE
,
requestHeader
);
}
...
...
@@ -334,11 +336,11 @@ public class MQClientAPIImpl {
}
private
SendResult
sendMessageSync
(
//
final
String
addr
,
//
final
String
brokerName
,
//
final
Message
msg
,
//
final
long
timeoutMillis
,
//
final
RemotingCommand
request
//
final
String
addr
,
//
final
String
brokerName
,
//
final
Message
msg
,
//
final
long
timeoutMillis
,
//
final
RemotingCommand
request
//
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
{
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
addr
,
request
,
timeoutMillis
);
assert
response
!=
null
;
...
...
@@ -507,8 +509,16 @@ public class MQClientAPIImpl {
MessageQueue
messageQueue
=
new
MessageQueue
(
msg
.
getTopic
(),
brokerName
,
responseHeader
.
getQueueId
());
String
uniqMsgId
=
MessageClientIDSetter
.
getUniqID
(
msg
);
if
(
msg
instanceof
MessageBatch
)
{
StringBuilder
sb
=
new
StringBuilder
();
for
(
Message
message
:
(
MessageBatch
)
msg
)
{
sb
.
append
(
sb
.
length
()
==
0
?
""
:
","
).
append
(
MessageClientIDSetter
.
getUniqID
(
message
));
}
uniqMsgId
=
sb
.
toString
();
}
SendResult
sendResult
=
new
SendResult
(
sendStatus
,
MessageClientIDSetter
.
getUniqID
(
msg
)
,
uniqMsgId
,
responseHeader
.
getMsgId
(),
messageQueue
,
responseHeader
.
getQueueOffset
());
sendResult
.
setTransactionId
(
responseHeader
.
getTransactionId
());
String
regionId
=
response
.
getExtFields
().
get
(
MessageConst
.
PROPERTY_MSG_REGION
);
...
...
@@ -1452,7 +1462,7 @@ public class MQClientAPIImpl {
}
public
Map
<
String
,
Map
<
MessageQueue
,
Long
>>
invokeBrokerToGetConsumerStatus
(
final
String
addr
,
final
String
topic
,
final
String
group
,
final
String
clientAddr
,
final
long
timeoutMillis
)
throws
RemotingException
,
MQClientException
,
InterruptedException
{
final
String
clientAddr
,
final
long
timeoutMillis
)
throws
RemotingException
,
MQClientException
,
InterruptedException
{
GetConsumerStatusRequestHeader
requestHeader
=
new
GetConsumerStatusRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setGroup
(
group
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
浏览文件 @
47fad3c1
...
...
@@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageClientIDSetter
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageBatch
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageType
;
import
org.apache.rocketmq.common.message.MessageId
;
import
org.apache.rocketmq.client.QueryResult
;
import
org.apache.rocketmq.client.Validators
;
import
org.apache.rocketmq.client.common.ClientErrorCode
;
...
...
@@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.ServiceState
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageClientIDSetter
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageId
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageType
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader
;
...
...
@@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
byte
[]
prevBody
=
msg
.
getBody
();
try
{
MessageClientIDSetter
.
setUniqID
(
msg
);
//for MessageBatch,ID has been set in the generating process
if
(!(
msg
instanceof
MessageBatch
))
{
MessageClientIDSetter
.
setUniqID
(
msg
);
}
int
sysFlag
=
0
;
if
(
this
.
tryToCompressMessage
(
msg
))
{
...
...
@@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader
.
setProperties
(
MessageDecoder
.
messageProperties2String
(
msg
.
getProperties
()));
requestHeader
.
setReconsumeTimes
(
0
);
requestHeader
.
setUnitMode
(
this
.
isUnitMode
());
requestHeader
.
setBatch
(
msg
instanceof
MessageBatch
);
if
(
requestHeader
.
getTopic
().
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
String
reconsumeTimes
=
MessageAccessor
.
getReconsumeTime
(
msg
);
if
(
reconsumeTimes
!=
null
)
{
...
...
@@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private
boolean
tryToCompressMessage
(
final
Message
msg
)
{
if
(
msg
instanceof
MessageBatch
)
{
//batch dose not support compressing right now
return
false
;
}
byte
[]
body
=
msg
.
getBody
();
if
(
body
!=
null
)
{
if
(
body
.
length
>=
this
.
defaultMQProducer
.
getCompressMsgBodyOverHowmuch
())
{
...
...
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
浏览文件 @
47fad3c1
...
...
@@ -16,14 +16,18 @@
*/
package
org.apache.rocketmq.client.producer
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.QueryResult
;
import
org.apache.rocketmq.client.Validators
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageBatch
;
import
org.apache.rocketmq.common.message.MessageClientIDSetter
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageId
;
...
...
@@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return
this
.
defaultMQProducerImpl
.
queryMessageByUniqKey
(
topic
,
msgId
);
}
@Override
public
SendResult
send
(
Collection
<
Message
>
msgs
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
batch
(
msgs
));
}
@Override
public
SendResult
send
(
Collection
<
Message
>
msgs
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
batch
(
msgs
),
timeout
);
}
@Override
public
SendResult
send
(
Collection
<
Message
>
msgs
,
MessageQueue
messageQueue
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
batch
(
msgs
),
messageQueue
);
}
@Override
public
SendResult
send
(
Collection
<
Message
>
msgs
,
MessageQueue
messageQueue
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
batch
(
msgs
),
messageQueue
,
timeout
);
}
private
MessageBatch
batch
(
Collection
<
Message
>
msgs
)
throws
MQClientException
{
MessageBatch
msgBatch
;
try
{
msgBatch
=
MessageBatch
.
generateFromList
(
msgs
);
for
(
Message
message
:
msgBatch
)
{
Validators
.
checkMessage
(
message
,
this
);
MessageClientIDSetter
.
setUniqID
(
message
);
}
msgBatch
.
setBody
(
msgBatch
.
encode
());
}
catch
(
Exception
e
)
{
throw
new
MQClientException
(
"Failed to initiate the MessageBatch"
,
e
);
}
return
msgBatch
;
}
public
String
getProducerGroup
()
{
return
producerGroup
;
}
...
...
client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
浏览文件 @
47fad3c1
...
...
@@ -16,6 +16,7 @@
*/
package
org.apache.rocketmq.client.producer
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.MQAdmin
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
...
...
@@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin {
TransactionSendResult
sendMessageInTransaction
(
final
Message
msg
,
final
LocalTransactionExecuter
tranExecuter
,
final
Object
arg
)
throws
MQClientException
;
//for batch
SendResult
send
(
final
Collection
<
Message
>
msgs
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
SendResult
send
(
final
Collection
<
Message
>
msgs
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
SendResult
send
(
final
Collection
<
Message
>
msgs
,
final
MessageQueue
mq
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
SendResult
send
(
final
Collection
<
Message
>
msgs
,
final
MessageQueue
mq
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
}
common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
浏览文件 @
47fad3c1
...
...
@@ -19,4 +19,5 @@ package org.apache.rocketmq.common;
public
enum
TopicFilterType
{
SINGLE_TAG
,
MULTI_TAG
}
common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
0 → 100644
浏览文件 @
47fad3c1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.message
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.List
;
import
org.apache.rocketmq.common.MixAll
;
public
class
MessageBatch
extends
Message
implements
Iterable
<
Message
>
{
private
static
final
long
serialVersionUID
=
621335151046335557L
;
private
final
List
<
Message
>
messages
;
private
MessageBatch
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
}
public
byte
[]
encode
()
{
return
MessageDecoder
.
encodeMessages
(
messages
);
}
public
Iterator
<
Message
>
iterator
()
{
return
messages
.
iterator
();
}
public
static
MessageBatch
generateFromList
(
Collection
<
Message
>
messages
)
{
assert
messages
!=
null
;
assert
messages
.
size
()
>
0
;
List
<
Message
>
messageList
=
new
ArrayList
<
Message
>(
messages
.
size
());
Message
first
=
null
;
for
(
Message
message
:
messages
)
{
if
(
message
.
getDelayTimeLevel
()
>
0
)
{
throw
new
UnsupportedOperationException
(
"TimeDelayLevel in not supported for batching"
);
}
if
(
message
.
getTopic
().
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
throw
new
UnsupportedOperationException
(
"Retry Group is not supported for batching"
);
}
if
(
first
==
null
)
{
first
=
message
;
}
else
{
if
(!
first
.
getTopic
().
equals
(
message
.
getTopic
()))
{
throw
new
UnsupportedOperationException
(
"The topic of the messages in one batch should be the same"
);
}
if
(
first
.
isWaitStoreMsgOK
()
!=
message
.
isWaitStoreMsgOK
())
{
throw
new
UnsupportedOperationException
(
"The waitStoreMsgOK of the messages in one batch should the same"
);
}
}
messageList
.
add
(
message
);
}
MessageBatch
messageBatch
=
new
MessageBatch
(
messageList
);
messageBatch
.
setTopic
(
first
.
getTopic
());
messageBatch
.
setWaitStoreMsgOK
(
first
.
isWaitStoreMsgOK
());
return
messageBatch
;
}
}
common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
浏览文件 @
47fad3c1
...
...
@@ -200,6 +200,8 @@ public class MessageDecoder {
return
byteBuffer
.
array
();
}
public
static
MessageExt
decode
(
java
.
nio
.
ByteBuffer
byteBuffer
,
final
boolean
readBody
,
final
boolean
deCompressBody
)
{
return
decode
(
byteBuffer
,
readBody
,
deCompressBody
,
false
);
...
...
@@ -372,4 +374,105 @@ public class MessageDecoder {
return
map
;
}
public
static
byte
[]
encodeMessage
(
Message
message
)
{
//only need flag, body, properties
byte
[]
body
=
message
.
getBody
();
int
bodyLen
=
body
.
length
;
String
properties
=
messageProperties2String
(
message
.
getProperties
());
byte
[]
propertiesBytes
=
properties
.
getBytes
(
CHARSET_UTF8
);
//note properties length must not more than Short.MAX
short
propertiesLength
=
(
short
)
propertiesBytes
.
length
;
int
sysFlag
=
message
.
getFlag
();
int
storeSize
=
4
// 1 TOTALSIZE
+
4
// 2 MAGICCOD
+
4
// 3 BODYCRC
+
4
// 4 FLAG
+
4
+
bodyLen
// 4 BODY
+
2
+
propertiesLength
;
ByteBuffer
byteBuffer
=
ByteBuffer
.
allocate
(
storeSize
);
// 1 TOTALSIZE
byteBuffer
.
putInt
(
storeSize
);
// 2 MAGICCODE
byteBuffer
.
putInt
(
0
);
// 3 BODYCRC
byteBuffer
.
putInt
(
0
);
// 4 FLAG
int
flag
=
message
.
getFlag
();
byteBuffer
.
putInt
(
flag
);
// 5 BODY
byteBuffer
.
putInt
(
bodyLen
);
byteBuffer
.
put
(
body
);
// 6 properties
byteBuffer
.
putShort
(
propertiesLength
);
byteBuffer
.
put
(
propertiesBytes
);
return
byteBuffer
.
array
();
}
public
static
Message
decodeMessage
(
ByteBuffer
byteBuffer
)
throws
Exception
{
Message
message
=
new
Message
();
// 1 TOTALSIZE
byteBuffer
.
getInt
();
// 2 MAGICCODE
byteBuffer
.
getInt
();
// 3 BODYCRC
byteBuffer
.
getInt
();
// 4 FLAG
int
flag
=
byteBuffer
.
getInt
();
message
.
setFlag
(
flag
);
// 5 BODY
int
bodyLen
=
byteBuffer
.
getInt
();
byte
[]
body
=
new
byte
[
bodyLen
];
byteBuffer
.
get
(
body
);
message
.
setBody
(
body
);
// 6 properties
short
propertiesLen
=
byteBuffer
.
getShort
();
byte
[]
propertiesBytes
=
new
byte
[
propertiesLen
];
byteBuffer
.
get
(
propertiesBytes
);
message
.
setProperties
(
string2messageProperties
(
new
String
(
propertiesBytes
,
CHARSET_UTF8
)));
return
message
;
}
public
static
byte
[]
encodeMessages
(
List
<
Message
>
messages
)
{
//TO DO refactor, accumulate in one buffer, avoid copies
List
<
byte
[]>
encodedMessages
=
new
ArrayList
<
byte
[]>(
messages
.
size
());
int
allSize
=
0
;
for
(
Message
message:
messages
)
{
byte
[]
tmp
=
encodeMessage
(
message
);
encodedMessages
.
add
(
tmp
);
allSize
+=
tmp
.
length
;
}
byte
[]
allBytes
=
new
byte
[
allSize
];
int
pos
=
0
;
for
(
byte
[]
bytes
:
encodedMessages
)
{
System
.
arraycopy
(
bytes
,
0
,
allBytes
,
pos
,
bytes
.
length
);
pos
+=
bytes
.
length
;
}
return
allBytes
;
}
public
static
List
<
Message
>
decodeMessages
(
ByteBuffer
byteBuffer
)
throws
Exception
{
//TO DO add a callback for processing, avoid creating lists
List
<
Message
>
msgs
=
new
ArrayList
<
Message
>();
while
(
byteBuffer
.
hasRemaining
())
{
Message
msg
=
decodeMessage
(
byteBuffer
);
msgs
.
add
(
msg
);
}
return
msgs
;
}
}
common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
浏览文件 @
47fad3c1
...
...
@@ -64,7 +64,7 @@ public class MessageExt extends Message {
return
TopicFilterType
.
SINGLE_TAG
;
}
p
rivate
static
ByteBuffer
socketAddress2ByteBuffer
(
final
SocketAddress
socketAddress
,
final
ByteBuffer
byteBuffer
)
{
p
ublic
static
ByteBuffer
socketAddress2ByteBuffer
(
final
SocketAddress
socketAddress
,
final
ByteBuffer
byteBuffer
)
{
InetSocketAddress
inetSocketAddress
=
(
InetSocketAddress
)
socketAddress
;
byteBuffer
.
put
(
inetSocketAddress
.
getAddress
().
getAddress
(),
0
,
4
);
byteBuffer
.
putInt
(
inetSocketAddress
.
getPort
());
...
...
common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
0 → 100644
浏览文件 @
47fad3c1
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.message
;
import
java.nio.ByteBuffer
;
public
class
MessageExtBatch
extends
MessageExt
{
private
static
final
long
serialVersionUID
=
-
2353110995348498537L
;
public
ByteBuffer
wrap
()
{
assert
getBody
()
!=
null
;
return
ByteBuffer
.
wrap
(
getBody
(),
0
,
getBody
().
length
);
}
private
ByteBuffer
encodedBuff
;
public
ByteBuffer
getEncodedBuff
()
{
return
encodedBuff
;
}
public
void
setEncodedBuff
(
ByteBuffer
encodedBuff
)
{
this
.
encodedBuff
=
encodedBuff
;
}
}
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
浏览文件 @
47fad3c1
...
...
@@ -159,4 +159,7 @@ public class RequestCode {
* get config from name server
*/
public
static
final
int
GET_NAMESRV_CONFIG
=
319
;
public
static
final
int
SEND_BATCH_MESSAGE
=
320
;
}
common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
浏览文件 @
47fad3c1
...
...
@@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private
Integer
reconsumeTimes
;
@CFNullable
private
boolean
unitMode
=
false
;
@CFNullable
private
boolean
batch
=
false
;
private
Integer
maxReconsumeTimes
;
@Override
...
...
@@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public
void
setMaxReconsumeTimes
(
final
Integer
maxReconsumeTimes
)
{
this
.
maxReconsumeTimes
=
maxReconsumeTimes
;
}
public
boolean
isBatch
()
{
return
batch
;
}
public
void
setBatch
(
boolean
batch
)
{
this
.
batch
=
batch
;
}
}
common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
浏览文件 @
47fad3c1
...
...
@@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private
Integer
l
;
// consumeRetryTimes
@CFNullable
private
boolean
m
;
//batch
public
static
SendMessageRequestHeader
createSendMessageRequestHeaderV1
(
final
SendMessageRequestHeaderV2
v2
)
{
SendMessageRequestHeader
v1
=
new
SendMessageRequestHeader
();
v1
.
setProducerGroup
(
v2
.
a
);
...
...
@@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1
.
setReconsumeTimes
(
v2
.
j
);
v1
.
setUnitMode
(
v2
.
k
);
v1
.
setMaxReconsumeTimes
(
v2
.
l
);
v1
.
setBatch
(
v2
.
m
);
return
v1
;
}
...
...
@@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2
.
j
=
v1
.
getReconsumeTimes
();
v2
.
k
=
v1
.
isUnitMode
();
v2
.
l
=
v1
.
getMaxReconsumeTimes
();
v2
.
m
=
v1
.
isBatch
();
return
v2
;
}
...
...
@@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public
void
setL
(
final
Integer
l
)
{
this
.
l
=
l
;
}
public
boolean
isM
()
{
return
m
;
}
public
void
setM
(
boolean
m
)
{
this
.
m
=
m
;
}
}
\ No newline at end of file
common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
0 → 100644
浏览文件 @
47fad3c1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageBatch
;
import
org.junit.Test
;
public
class
MessageBatchTest
{
public
List
<
Message
>
generateMessages
()
{
List
<
Message
>
messages
=
new
ArrayList
<
Message
>();
Message
message1
=
new
Message
(
"topic1"
,
"body"
.
getBytes
());
Message
message2
=
new
Message
(
"topic1"
,
"body"
.
getBytes
());
messages
.
add
(
message1
);
messages
.
add
(
message2
);
return
messages
;
}
@Test
public
void
testGenerate_OK
()
throws
Exception
{
List
<
Message
>
messages
=
generateMessages
();
MessageBatch
.
generateFromList
(
messages
);
}
@Test
(
expected
=
UnsupportedOperationException
.
class
)
public
void
testGenerate_DiffTopic
()
throws
Exception
{
List
<
Message
>
messages
=
generateMessages
();
messages
.
get
(
1
).
setTopic
(
"topic2"
);
MessageBatch
.
generateFromList
(
messages
);
}
@Test
(
expected
=
UnsupportedOperationException
.
class
)
public
void
testGenerate_DiffWaitOK
()
throws
Exception
{
List
<
Message
>
messages
=
generateMessages
();
messages
.
get
(
1
).
setWaitStoreMsgOK
(
false
);
MessageBatch
.
generateFromList
(
messages
);
}
@Test
(
expected
=
UnsupportedOperationException
.
class
)
public
void
testGenerate_Delay
()
throws
Exception
{
List
<
Message
>
messages
=
generateMessages
();
messages
.
get
(
1
).
setDelayTimeLevel
(
1
);
MessageBatch
.
generateFromList
(
messages
);
}
@Test
(
expected
=
UnsupportedOperationException
.
class
)
public
void
testGenerate_Retry
()
throws
Exception
{
List
<
Message
>
messages
=
generateMessages
();
messages
.
get
(
1
).
setTopic
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
+
"topic"
);
MessageBatch
.
generateFromList
(
messages
);
}
}
common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
0 → 100644
浏览文件 @
47fad3c1
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.junit.Test
;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.List
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
/**
* Created by liuzhendong on 16/12/21.
*/
public
class
MessageEncodeDecodeTest
{
@Test
public
void
testEncodeDecodeSingle
()
throws
Exception
{
Message
message
=
new
Message
(
"topic"
,
"body"
.
getBytes
());
message
.
setFlag
(
12
);
message
.
putUserProperty
(
"key"
,
"value"
);
byte
[]
bytes
=
MessageDecoder
.
encodeMessage
(
message
);
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
bytes
.
length
);
buffer
.
put
(
bytes
);
buffer
.
flip
();
Message
newMessage
=
MessageDecoder
.
decodeMessage
(
buffer
);
assertTrue
(
message
.
getFlag
()
==
newMessage
.
getFlag
());
assertTrue
(
newMessage
.
getProperty
(
"key"
).
equals
(
newMessage
.
getProperty
(
"key"
)));
assertTrue
(
Arrays
.
equals
(
newMessage
.
getBody
(),
message
.
getBody
()));
}
@Test
public
void
testEncodeDecodeList
()
throws
Exception
{
List
<
Message
>
messages
=
new
ArrayList
<
Message
>(
128
);
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
Message
message
=
new
Message
(
"topic"
,
(
"body"
+
i
).
getBytes
());
message
.
setFlag
(
i
);
message
.
putUserProperty
(
"key"
,
"value"
+
i
);
messages
.
add
(
message
);
}
byte
[]
bytes
=
MessageDecoder
.
encodeMessages
(
messages
);
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
bytes
.
length
);
buffer
.
put
(
bytes
);
buffer
.
flip
();
List
<
Message
>
newMsgs
=
MessageDecoder
.
decodeMessages
(
buffer
);
assertTrue
(
newMsgs
.
size
()
==
messages
.
size
());
for
(
int
i
=
0
;
i
<
newMsgs
.
size
();
i
++)
{
Message
message
=
messages
.
get
(
i
);
Message
newMessage
=
newMsgs
.
get
(
i
);
assertTrue
(
message
.
getFlag
()
==
newMessage
.
getFlag
());
assertTrue
(
newMessage
.
getProperty
(
"key"
).
equals
(
newMessage
.
getProperty
(
"key"
)));
assertTrue
(
Arrays
.
equals
(
newMessage
.
getBody
(),
message
.
getBody
()));
}
}
}
store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
浏览文件 @
47fad3c1
...
...
@@ -17,6 +17,7 @@
package
org.apache.rocketmq.store
;
import
java.nio.ByteBuffer
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
/**
* Write messages callback interface
...
...
@@ -32,5 +33,17 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msg
);
final
int
maxBlank
,
final
MessageExtBrokerInner
msg
);
/**
* After batched message serialization, write MapedByteBuffer
*
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
*
* @return How many bytes to write
*/
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
);
}
store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
浏览文件 @
47fad3c1
...
...
@@ -34,6 +34,8 @@ public class AppendMessageResult {
private
long
logicsOffset
;
private
long
pagecacheRT
=
0
;
private
int
msgNum
=
1
;
public
AppendMessageResult
(
AppendMessageStatus
status
)
{
this
(
status
,
0
,
0
,
""
,
0
,
0
,
0
);
}
...
...
@@ -109,6 +111,14 @@ public class AppendMessageResult {
this
.
logicsOffset
=
logicsOffset
;
}
public
int
getMsgNum
()
{
return
msgNum
;
}
public
void
setMsgNum
(
int
msgNum
)
{
this
.
msgNum
=
msgNum
;
}
@Override
public
String
toString
()
{
return
"AppendMessageResult{"
+
...
...
@@ -119,6 +129,7 @@ public class AppendMessageResult {
", storeTimestamp="
+
storeTimestamp
+
", logicsOffset="
+
logicsOffset
+
", pagecacheRT="
+
pagecacheRT
+
", msgNum="
+
msgNum
+
'}'
;
}
}
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
47fad3c1
...
...
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
import
org.apache.rocketmq.store.config.BrokerRole
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
...
...
@@ -57,6 +58,7 @@ public class CommitLog {
private
final
FlushCommitLogService
commitLogService
;
private
final
AppendMessageCallback
appendMessageCallback
;
private
final
ThreadLocal
<
MessageExtBatchEncoder
>
batchEncoderThreadLocal
;
private
HashMap
<
String
/* topic-queueid */
,
Long
/* offset */
>
topicQueueTable
=
new
HashMap
<
String
,
Long
>(
1024
);
private
volatile
long
confirmOffset
=
-
1L
;
...
...
@@ -81,6 +83,11 @@ public class CommitLog {
this
.
commitLogService
=
new
CommitRealTimeService
();
this
.
appendMessageCallback
=
new
DefaultAppendMessageCallback
(
defaultMessageStore
.
getMessageStoreConfig
().
getMaxMessageSize
());
batchEncoderThreadLocal
=
new
ThreadLocal
<
MessageExtBatchEncoder
>()
{
@Override
protected
MessageExtBatchEncoder
initialValue
()
{
return
new
MessageExtBatchEncoder
(
defaultMessageStore
.
getMessageStoreConfig
().
getMaxMessageSize
());
}
};
}
public
boolean
load
()
{
...
...
@@ -222,7 +229,8 @@ public class CommitLog {
*
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public
DispatchRequest
checkMessageAndReturnSize
(
java
.
nio
.
ByteBuffer
byteBuffer
,
final
boolean
checkCRC
,
final
boolean
readBody
)
{
public
DispatchRequest
checkMessageAndReturnSize
(
java
.
nio
.
ByteBuffer
byteBuffer
,
final
boolean
checkCRC
,
final
boolean
readBody
)
{
try
{
// 1 TOTAL SIZE
int
totalSize
=
byteBuffer
.
getInt
();
...
...
@@ -370,7 +378,7 @@ public class CommitLog {
return
new
DispatchRequest
(-
1
,
false
/* success */
);
}
private
int
calMsgLength
(
int
bodyLength
,
int
topicLength
,
int
propertiesLength
)
{
private
static
int
calMsgLength
(
int
bodyLength
,
int
topicLength
,
int
propertiesLength
)
{
final
int
msgLen
=
4
// 1 TOTALSIZE
+
4
// 2 MAGICCODE
+
4
// 3 BODYCRC
...
...
@@ -633,18 +641,23 @@ public class CommitLog {
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
msg
.
getTopic
()).
incrementAndGet
();
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
topic
).
addAndGet
(
result
.
getWroteBytes
());
GroupCommitRequest
request
=
null
;
handleDiskFlush
(
result
,
putMessageResult
,
msg
);
handleHA
(
result
,
putMessageResult
,
msg
);
return
putMessageResult
;
}
public
void
handleDiskFlush
(
AppendMessageResult
result
,
PutMessageResult
putMessageResult
,
MessageExt
messageExt
)
{
// Synchronization flush
if
(
FlushDiskType
.
SYNC_FLUSH
==
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
final
GroupCommitService
service
=
(
GroupCommitService
)
this
.
flushCommitLogService
;
if
(
m
sg
.
isWaitStoreMsgOK
())
{
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
if
(
m
essageExt
.
isWaitStoreMsgOK
())
{
GroupCommitRequest
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
service
.
putRequest
(
request
);
boolean
flushOK
=
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do groupcommit, wait for flush failed, topic: "
+
m
sg
.
getTopic
()
+
" tags: "
+
msg
.
getTags
()
+
" client address: "
+
m
sg
.
getBornHostString
());
log
.
error
(
"do groupcommit, wait for flush failed, topic: "
+
m
essageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
m
essageExt
.
getBornHostString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
);
}
}
else
{
...
...
@@ -659,26 +672,22 @@ public class CommitLog {
commitLogService
.
wakeup
();
}
}
}
// Synchronous write double
public
void
handleHA
(
AppendMessageResult
result
,
PutMessageResult
putMessageResult
,
MessageExt
messageExt
)
{
if
(
BrokerRole
.
SYNC_MASTER
==
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getBrokerRole
())
{
HAService
service
=
this
.
defaultMessageStore
.
getHaService
();
if
(
m
sg
.
isWaitStoreMsgOK
())
{
if
(
m
essageExt
.
isWaitStoreMsgOK
())
{
// Determine whether to wait
if
(
service
.
isSlaveOK
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
()))
{
if
(
null
==
request
)
{
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
}
GroupCommitRequest
request
=
new
GroupCommitRequest
(
result
.
getWroteOffset
()
+
result
.
getWroteBytes
());
service
.
putRequest
(
request
);
service
.
getWaitNotifyObject
().
wakeupAll
();
boolean
flushOK
=
// TODO
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do sync transfer other node, wait return, but failed, topic: "
+
m
sg
.
getTopic
()
+
" tags: "
+
m
sg
.
getTags
()
+
" client address: "
+
msg
.
getBornHost
String
());
log
.
error
(
"do sync transfer other node, wait return, but failed, topic: "
+
m
essageExt
.
getTopic
()
+
" tags: "
+
m
essageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostName
String
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
);
}
}
...
...
@@ -690,12 +699,109 @@ public class CommitLog {
}
}
}
public
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
)
{
messageExtBatch
.
setStoreTimestamp
(
System
.
currentTimeMillis
());
AppendMessageResult
result
;
StoreStatsService
storeStatsService
=
this
.
defaultMessageStore
.
getStoreStatsService
();
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
messageExtBatch
.
getSysFlag
());
if
(
tranType
!=
MessageSysFlag
.
TRANSACTION_NOT_TYPE
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
if
(
messageExtBatch
.
getDelayTimeLevel
()
>
0
)
{
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
long
eclipseTimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder
batchEncoder
=
batchEncoderThreadLocal
.
get
();
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
lockForPutMessage
();
//spin...
try
{
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
this
.
beginTimeInLock
=
beginLockTimestamp
;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch
.
setStoreTimestamp
(
beginLockTimestamp
);
if
(
null
==
mappedFile
||
mappedFile
.
isFull
())
{
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
// Mark: NewFile may be cause noise
}
if
(
null
==
mappedFile
)
{
log
.
error
(
"Create maped file1 error, topic: {} clientAddr: {}"
,
messageExtBatch
.
getTopic
(),
messageExtBatch
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
null
);
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
switch
(
result
.
getStatus
())
{
case
PUT_OK:
break
;
case
END_OF_FILE:
unlockMappedFile
=
mappedFile
;
// Create a new file, re-write the message
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
(
0
);
if
(
null
==
mappedFile
)
{
// XXX: warn and notify me
log
.
error
(
"Create maped file2 error, topic: {} clientAddr: {}"
,
messageExtBatch
.
getTopic
(),
messageExtBatch
.
getBornHostString
());
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
CREATE_MAPEDFILE_FAILED
,
result
);
}
result
=
mappedFile
.
appendMessages
(
messageExtBatch
,
this
.
appendMessageCallback
);
break
;
case
MESSAGE_SIZE_EXCEEDED:
case
PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
result
);
case
UNKNOWN_ERROR:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
default
:
beginTimeInLock
=
0
;
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
}
eclipseTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
}
finally
{
releasePutMessageLock
();
}
if
(
eclipseTimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
eclipseTimeInLock
,
messageExtBatch
.
getBody
().
length
,
result
);
}
if
(
null
!=
unlockMappedFile
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
this
.
defaultMessageStore
.
unlockMappedFile
(
unlockMappedFile
);
}
PutMessageResult
putMessageResult
=
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
result
);
// Statistics
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
result
.
getMsgNum
());
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
messageExtBatch
.
getTopic
()).
addAndGet
(
result
.
getWroteBytes
());
handleDiskFlush
(
result
,
putMessageResult
,
messageExtBatch
);
handleHA
(
result
,
putMessageResult
,
messageExtBatch
);
return
putMessageResult
;
}
/**
* According to receive certain message or offset storage time if an error
* occurs, it returns -1
* According to receive certain message or offset storage time if an error occurs, it returns -1
*/
public
long
pickupStoreTimestamp
(
final
long
offset
,
final
int
size
)
{
if
(
offset
>=
this
.
getMinOffset
())
{
...
...
@@ -1098,6 +1204,8 @@ public class CommitLog {
// Build Message Key
private
final
StringBuilder
keyBuilder
=
new
StringBuilder
();
private
final
StringBuilder
msgIdBuilder
=
new
StringBuilder
();
private
final
ByteBuffer
hostHolder
=
ByteBuffer
.
allocate
(
8
);
DefaultAppendMessageCallback
(
final
int
size
)
{
...
...
@@ -1110,7 +1218,8 @@ public class CommitLog {
return
msgStoreItemMemory
;
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msgInner
)
{
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msgInner
)
{
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
...
...
@@ -1218,7 +1327,7 @@ public class CommitLog {
// 12 STOREHOSTADDRESS
this
.
resetByteBuffer
(
hostHolder
,
8
);
this
.
msgStoreItemMemory
.
put
(
msgInner
.
getStoreHostBytes
(
hostHolder
));
//this.msg
StoreItem
Memory.put(msgInner.getStoreHostBytes());
//this.msg
Batch
Memory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this
.
msgStoreItemMemory
.
putInt
(
msgInner
.
getReconsumeTimes
());
// 14 Prepared Transaction Offset
...
...
@@ -1257,9 +1366,202 @@ public class CommitLog {
return
result
;
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
)
{
byteBuffer
.
mark
();
//physical offset
long
wroteOffset
=
fileFromOffset
+
byteBuffer
.
position
();
// Record ConsumeQueue information
keyBuilder
.
setLength
(
0
);
keyBuilder
.
append
(
messageExtBatch
.
getTopic
());
keyBuilder
.
append
(
'-'
);
keyBuilder
.
append
(
messageExtBatch
.
getQueueId
());
String
key
=
keyBuilder
.
toString
();
Long
queueOffset
=
CommitLog
.
this
.
topicQueueTable
.
get
(
key
);
if
(
null
==
queueOffset
)
{
queueOffset
=
0L
;
CommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
}
long
beginQueueOffset
=
queueOffset
;
int
totalMsgLen
=
0
;
int
msgNum
=
0
;
msgIdBuilder
.
setLength
(
0
);
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
getEncodedBuff
();
this
.
resetByteBuffer
(
hostHolder
,
8
);
ByteBuffer
storeHostBytes
=
messageExtBatch
.
getStoreHostBytes
(
hostHolder
);
messagesByteBuff
.
mark
();
while
(
messagesByteBuff
.
hasRemaining
())
{
// 1 TOTALSIZE
final
int
msgPos
=
messagesByteBuff
.
position
();
final
int
msgLen
=
messagesByteBuff
.
getInt
();
final
int
bodyLen
=
msgLen
-
40
;
//only for log, just estimate it
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
totalMsgLen
+=
msgLen
;
// Determines whether there is sufficient free space
if
((
totalMsgLen
+
END_FILE_MIN_BLANK_LENGTH
)
>
maxBlank
)
{
this
.
resetByteBuffer
(
this
.
msgStoreItemMemory
,
8
);
// 1 TOTALSIZE
this
.
msgStoreItemMemory
.
putInt
(
maxBlank
);
// 2 MAGICCODE
this
.
msgStoreItemMemory
.
putInt
(
CommitLog
.
BLANK_MAGIC_CODE
);
// 3 The remaining space may be any value
//
//ignore previous read
messagesByteBuff
.
reset
();
// Here the length of the specially set maxBlank
byteBuffer
.
reset
();
//ignore the previous appended messages
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
8
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
//move to add queue offset and commitlog offset
messagesByteBuff
.
position
(
msgPos
+
20
);
messagesByteBuff
.
putLong
(
queueOffset
);
messagesByteBuff
.
putLong
(
wroteOffset
+
totalMsgLen
-
msgLen
);
storeHostBytes
.
rewind
();
String
msgId
=
MessageDecoder
.
createMessageId
(
this
.
msgIdMemory
,
storeHostBytes
,
wroteOffset
+
totalMsgLen
-
msgLen
);
if
(
msgIdBuilder
.
length
()
>
0
)
{
msgIdBuilder
.
append
(
','
).
append
(
msgId
);
}
else
{
msgIdBuilder
.
append
(
msgId
);
}
queueOffset
++;
msgNum
++;
messagesByteBuff
.
position
(
msgPos
+
msgLen
);
}
messagesByteBuff
.
position
(
0
);
messagesByteBuff
.
limit
(
totalMsgLen
);
byteBuffer
.
put
(
messagesByteBuff
);
messageExtBatch
.
setEncodedBuff
(
null
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
totalMsgLen
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
result
.
setMsgNum
(
msgNum
);
CommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
return
result
;
}
private
void
resetByteBuffer
(
final
ByteBuffer
byteBuffer
,
final
int
limit
)
{
byteBuffer
.
flip
();
byteBuffer
.
limit
(
limit
);
}
}
public
static
class
MessageExtBatchEncoder
{
// Store the message content
private
final
ByteBuffer
msgBatchMemory
;
// The maximum length of the message
private
final
int
maxMessageSize
;
private
final
ByteBuffer
hostHolder
=
ByteBuffer
.
allocate
(
8
);
MessageExtBatchEncoder
(
final
int
size
)
{
this
.
msgBatchMemory
=
ByteBuffer
.
allocateDirect
(
size
);
this
.
maxMessageSize
=
size
;
}
public
ByteBuffer
encode
(
final
MessageExtBatch
messageExtBatch
)
{
msgBatchMemory
.
clear
();
//not thread-safe
int
totalMsgLen
=
0
;
ByteBuffer
messagesByteBuff
=
messageExtBatch
.
wrap
();
while
(
messagesByteBuff
.
hasRemaining
())
{
// 1 TOTALSIZE
messagesByteBuff
.
getInt
();
// 2 MAGICCODE
messagesByteBuff
.
getInt
();
// 3 BODYCRC
messagesByteBuff
.
getInt
();
// 4 FLAG
int
flag
=
messagesByteBuff
.
getInt
();
// 5 BODY
int
bodyLen
=
messagesByteBuff
.
getInt
();
int
bodyPos
=
messagesByteBuff
.
position
();
int
bodyCrc
=
UtilAll
.
crc32
(
messagesByteBuff
.
array
(),
bodyPos
,
bodyLen
);
messagesByteBuff
.
position
(
bodyPos
+
bodyLen
);
// 6 properties
short
propertiesLen
=
messagesByteBuff
.
getShort
();
int
propertiesPos
=
messagesByteBuff
.
position
();
messagesByteBuff
.
position
(
propertiesPos
+
propertiesLen
);
final
byte
[]
topicData
=
messageExtBatch
.
getTopic
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
topicLength
=
topicData
.
length
;
final
int
msgLen
=
calMsgLength
(
bodyLen
,
topicLength
,
propertiesLen
);
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
throw
new
RuntimeException
(
"message size exceeded"
);
}
totalMsgLen
+=
msgLen
;
// Determines whether there is sufficient free space
if
(
totalMsgLen
>
maxMessageSize
)
{
throw
new
RuntimeException
(
"message size exceeded"
);
}
// 1 TOTALSIZE
this
.
msgBatchMemory
.
putInt
(
msgLen
);
// 2 MAGICCODE
this
.
msgBatchMemory
.
putInt
(
CommitLog
.
MESSAGE_MAGIC_CODE
);
// 3 BODYCRC
this
.
msgBatchMemory
.
putInt
(
bodyCrc
);
// 4 QUEUEID
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getQueueId
());
// 5 FLAG
this
.
msgBatchMemory
.
putInt
(
flag
);
// 6 QUEUEOFFSET
this
.
msgBatchMemory
.
putLong
(
0
);
// 7 PHYSICALOFFSET
this
.
msgBatchMemory
.
putLong
(
0
);
// 8 SYSFLAG
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getSysFlag
());
// 9 BORNTIMESTAMP
this
.
msgBatchMemory
.
putLong
(
messageExtBatch
.
getBornTimestamp
());
// 10 BORNHOST
this
.
resetByteBuffer
(
hostHolder
,
8
);
this
.
msgBatchMemory
.
put
(
messageExtBatch
.
getBornHostBytes
(
hostHolder
));
// 11 STORETIMESTAMP
this
.
msgBatchMemory
.
putLong
(
messageExtBatch
.
getStoreTimestamp
());
// 12 STOREHOSTADDRESS
this
.
resetByteBuffer
(
hostHolder
,
8
);
this
.
msgBatchMemory
.
put
(
messageExtBatch
.
getStoreHostBytes
(
hostHolder
));
// 13 RECONSUMETIMES
this
.
msgBatchMemory
.
putInt
(
messageExtBatch
.
getReconsumeTimes
());
// 14 Prepared Transaction Offset, batch does not support transaction
this
.
msgBatchMemory
.
putLong
(
0
);
// 15 BODY
this
.
msgBatchMemory
.
putInt
(
bodyLen
);
if
(
bodyLen
>
0
)
this
.
msgBatchMemory
.
put
(
messagesByteBuff
.
array
(),
bodyPos
,
bodyLen
);
// 16 TOPIC
this
.
msgBatchMemory
.
put
((
byte
)
topicLength
);
this
.
msgBatchMemory
.
put
(
topicData
);
// 17 PROPERTIES
this
.
msgBatchMemory
.
putShort
(
propertiesLen
);
if
(
propertiesLen
>
0
)
this
.
msgBatchMemory
.
put
(
messagesByteBuff
.
array
(),
propertiesPos
,
propertiesLen
);
}
msgBatchMemory
.
flip
();
return
msgBatchMemory
;
}
private
void
resetByteBuffer
(
final
ByteBuffer
byteBuffer
,
final
int
limit
)
{
byteBuffer
.
flip
();
byteBuffer
.
limit
(
limit
);
}
}
}
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
浏览文件 @
47fad3c1
...
...
@@ -331,7 +331,7 @@ public class ConsumeQueue {
public
void
putMessagePositionInfoWrapper
(
long
offset
,
int
size
,
long
tagsCode
,
long
storeTimestamp
,
long
logicOffset
)
{
final
int
maxRetries
=
30
;
boolean
canWrite
=
this
.
defaultMessageStore
.
getRunningFlags
().
isWriteable
();
boolean
canWrite
=
this
.
defaultMessageStore
.
getRunningFlags
().
is
CQ
Writeable
();
for
(
int
i
=
0
;
i
<
maxRetries
&&
canWrite
;
i
++)
{
boolean
result
=
this
.
putMessagePositionInfo
(
offset
,
size
,
tagsCode
,
logicOffset
);
if
(
result
)
{
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
47fad3c1
...
...
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.UtilAll;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
import
org.apache.rocketmq.common.running.RunningStats
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
...
...
@@ -325,6 +326,62 @@ public class DefaultMessageStore implements MessageStore {
return
result
;
}
public
PutMessageResult
putMessages
(
MessageExtBatch
messageExtBatch
)
{
if
(
this
.
shutdown
)
{
log
.
warn
(
"DefaultMessageStore has shutdown, so putMessages is forbidden"
);
return
new
PutMessageResult
(
PutMessageStatus
.
SERVICE_NOT_AVAILABLE
,
null
);
}
if
(
BrokerRole
.
SLAVE
==
this
.
messageStoreConfig
.
getBrokerRole
())
{
long
value
=
this
.
printTimes
.
getAndIncrement
();
if
((
value
%
50000
)
==
0
)
{
log
.
warn
(
"DefaultMessageStore is in slave mode, so putMessages is forbidden "
);
}
return
new
PutMessageResult
(
PutMessageStatus
.
SERVICE_NOT_AVAILABLE
,
null
);
}
if
(!
this
.
runningFlags
.
isWriteable
())
{
long
value
=
this
.
printTimes
.
getAndIncrement
();
if
((
value
%
50000
)
==
0
)
{
log
.
warn
(
"DefaultMessageStore is not writable, so putMessages is forbidden "
+
this
.
runningFlags
.
getFlagBits
());
}
return
new
PutMessageResult
(
PutMessageStatus
.
SERVICE_NOT_AVAILABLE
,
null
);
}
else
{
this
.
printTimes
.
set
(
0
);
}
if
(
messageExtBatch
.
getTopic
().
length
()
>
Byte
.
MAX_VALUE
)
{
log
.
warn
(
"PutMessages topic length too long "
+
messageExtBatch
.
getTopic
().
length
());
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
if
(
messageExtBatch
.
getBody
().
length
>
messageStoreConfig
.
getMaxMessageSize
())
{
log
.
warn
(
"PutMessages body length too long "
+
messageExtBatch
.
getBody
().
length
);
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
if
(
this
.
isOSPageCacheBusy
())
{
return
new
PutMessageResult
(
PutMessageStatus
.
OS_PAGECACHE_BUSY
,
null
);
}
long
beginTime
=
this
.
getSystemClock
().
now
();
PutMessageResult
result
=
this
.
commitLog
.
putMessages
(
messageExtBatch
);
long
eclipseTime
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
eclipseTime
>
500
)
{
log
.
warn
(
"not in lock eclipse time(ms)={}, bodyLength={}"
,
eclipseTime
,
messageExtBatch
.
getBody
().
length
);
}
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
eclipseTime
);
if
(
null
==
result
||
!
result
.
isOk
())
{
this
.
storeStatsService
.
getPutMessageFailedTimes
().
incrementAndGet
();
}
return
result
;
}
@Override
public
boolean
isOSPageCacheBusy
()
{
long
begin
=
this
.
getCommitLog
().
getBeginTimeInLock
();
...
...
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
浏览文件 @
47fad3c1
...
...
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.util.LibC
;
import
org.slf4j.Logger
;
...
...
@@ -187,7 +189,15 @@ public class MappedFile extends ReferenceResource {
}
public
AppendMessageResult
appendMessage
(
final
MessageExtBrokerInner
msg
,
final
AppendMessageCallback
cb
)
{
assert
msg
!=
null
;
return
appendMessagesInner
(
msg
,
cb
);
}
public
AppendMessageResult
appendMessages
(
final
MessageExtBatch
messageExtBatch
,
final
AppendMessageCallback
cb
)
{
return
appendMessagesInner
(
messageExtBatch
,
cb
);
}
public
AppendMessageResult
appendMessagesInner
(
final
MessageExt
messageExt
,
final
AppendMessageCallback
cb
)
{
assert
messageExt
!=
null
;
assert
cb
!=
null
;
int
currentPos
=
this
.
wrotePosition
.
get
();
...
...
@@ -195,30 +205,28 @@ public class MappedFile extends ReferenceResource {
if
(
currentPos
<
this
.
fileSize
)
{
ByteBuffer
byteBuffer
=
writeBuffer
!=
null
?
writeBuffer
.
slice
()
:
this
.
mappedByteBuffer
.
slice
();
byteBuffer
.
position
(
currentPos
);
AppendMessageResult
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
msg
);
AppendMessageResult
result
=
null
;
if
(
messageExt
instanceof
MessageExtBrokerInner
)
{
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBrokerInner
)
messageExt
);
}
else
if
(
messageExt
instanceof
MessageExtBatch
)
{
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBatch
)
messageExt
);
}
else
{
return
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
);
}
this
.
wrotePosition
.
addAndGet
(
result
.
getWroteBytes
());
this
.
storeTimestamp
=
result
.
getStoreTimestamp
();
return
result
;
}
log
.
error
(
"MappedFile.appendMessage return null, wrotePosition: "
+
currentPos
+
" fileSize: "
+
this
.
fileSize
);
log
.
error
(
"MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}"
,
currentPos
,
this
.
fileSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
UNKNOWN_ERROR
);
}
/**
*/
public
long
getFileFromOffset
()
{
return
this
.
fileFromOffset
;
}
/**
*
*/
public
boolean
appendMessage
(
final
byte
[]
data
)
{
int
currentPos
=
this
.
wrotePosition
.
get
();
...
...
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
浏览文件 @
47fad3c1
...
...
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
import
java.util.HashMap
;
import
java.util.Set
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
public
interface
MessageStore
{
...
...
@@ -33,6 +34,8 @@ public interface MessageStore {
PutMessageResult
putMessage
(
final
MessageExtBrokerInner
msg
);
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
);
GetMessageResult
getMessage
(
final
String
group
,
final
String
topic
,
final
int
queueId
,
final
long
offset
,
final
int
maxMsgNums
,
final
SubscriptionData
subscriptionData
);
...
...
store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
浏览文件 @
47fad3c1
...
...
@@ -27,6 +27,8 @@ public class RunningFlags {
private
static
final
int
WRITE_INDEX_FILE_ERROR_BIT
=
1
<<
3
;
private
static
final
int
DISK_FULL_BIT
=
1
<<
4
;
private
volatile
int
flagBits
=
0
;
public
RunningFlags
()
{
...
...
@@ -76,6 +78,15 @@ public class RunningFlags {
return
false
;
}
//for consume queue, just ignore the DISK_FULL_BIT
public
boolean
isCQWriteable
()
{
if
((
this
.
flagBits
&
(
NOT_WRITEABLE_BIT
|
WRITE_LOGICS_QUEUE_ERROR_BIT
|
WRITE_INDEX_FILE_ERROR_BIT
))
==
0
)
{
return
true
;
}
return
false
;
}
public
boolean
getAndMakeNotWriteable
()
{
boolean
result
=
this
.
isWriteable
();
if
(
result
)
{
...
...
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
浏览文件 @
47fad3c1
...
...
@@ -132,6 +132,7 @@ public class MessageStoreConfig {
private
int
transientStorePoolSize
=
5
;
private
boolean
fastFailIfNoBufferInStorePool
=
false
;
public
boolean
isDebugLockEnable
()
{
return
debugLockEnable
;
}
...
...
@@ -629,4 +630,5 @@ public class MessageStoreConfig {
public
void
setCommitCommitLogThoroughInterval
(
final
int
commitCommitLogThoroughInterval
)
{
this
.
commitCommitLogThoroughInterval
=
commitCommitLogThoroughInterval
;
}
}
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
浏览文件 @
47fad3c1
...
...
@@ -123,7 +123,9 @@ public class BrokerStatsManager {
public
void
incTopicPutNums
(
final
String
topic
)
{
this
.
statsTable
.
get
(
TOPIC_PUT_NUMS
).
addValue
(
topic
,
1
,
1
);
}
public
void
incTopicPutNums
(
final
String
topic
,
int
num
,
int
times
)
{
this
.
statsTable
.
get
(
TOPIC_PUT_NUMS
).
addValue
(
topic
,
num
,
times
);
}
public
void
incTopicPutSize
(
final
String
topic
,
final
int
size
)
{
this
.
statsTable
.
get
(
TOPIC_PUT_SIZE
).
addValue
(
topic
,
size
,
1
);
}
...
...
@@ -154,7 +156,9 @@ public class BrokerStatsManager {
public
void
incBrokerPutNums
()
{
this
.
statsTable
.
get
(
BROKER_PUT_NUMS
).
getAndCreateStatsItem
(
this
.
clusterName
).
getValue
().
incrementAndGet
();
}
public
void
incBrokerPutNums
(
final
int
incValue
)
{
this
.
statsTable
.
get
(
BROKER_PUT_NUMS
).
getAndCreateStatsItem
(
this
.
clusterName
).
getValue
().
addAndGet
(
incValue
);
}
public
void
incBrokerGetNums
(
final
int
incValue
)
{
this
.
statsTable
.
get
(
BROKER_GET_NUMS
).
getAndCreateStatsItem
(
this
.
clusterName
).
getValue
().
addAndGet
(
incValue
);
}
...
...
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
0 → 100644
浏览文件 @
47fad3c1
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.store
;
import
java.io.File
;
import
java.net.InetSocketAddress
;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.Before
;
import
org.junit.Test
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
public
class
AppendCallbackTest
{
AppendMessageCallback
callback
;
CommitLog
.
MessageExtBatchEncoder
batchEncoder
=
new
CommitLog
.
MessageExtBatchEncoder
(
10
*
1024
*
1024
);
@Before
public
void
init
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
10
);
messageStoreConfig
.
setStorePathRootDir
(
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"unitteststore"
);
messageStoreConfig
.
setStorePathCommitLog
(
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"unitteststore"
+
File
.
separator
+
"commitlog"
);
//too much reference
DefaultMessageStore
messageStore
=
new
DefaultMessageStore
(
messageStoreConfig
,
null
,
null
,
null
);
CommitLog
commitLog
=
new
CommitLog
(
messageStore
);
callback
=
commitLog
.
new
DefaultAppendMessageCallback
(
1024
);
}
@Test
public
void
testAppendMessageBatchEndOfFile
()
throws
Exception
{
List
<
Message
>
messages
=
new
ArrayList
<>();
String
topic
=
"test-topic"
;
int
queue
=
0
;
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
Message
msg
=
new
Message
();
msg
.
setBody
(
"body"
.
getBytes
());
msg
.
setTopic
(
topic
);
msg
.
setTags
(
"abc"
);
messages
.
add
(
msg
);
}
MessageExtBatch
messageExtBatch
=
new
MessageExtBatch
();
messageExtBatch
.
setTopic
(
topic
);
messageExtBatch
.
setQueueId
(
queue
);
messageExtBatch
.
setBornTimestamp
(
System
.
currentTimeMillis
());
messageExtBatch
.
setBornHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
123
));
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
//encounter end of file when append half of the data
AppendMessageResult
result
=
callback
.
doAppend
(
0
,
buff
,
1000
,
messageExtBatch
);
assertEquals
(
AppendMessageStatus
.
END_OF_FILE
,
result
.
getStatus
());
assertEquals
(
0
,
result
.
getWroteOffset
());
assertEquals
(
0
,
result
.
getLogicsOffset
());
assertEquals
(
1000
,
result
.
getWroteBytes
());
assertEquals
(
8
,
buff
.
position
());
//write blank size and magic value
assertTrue
(
result
.
getMsgId
().
length
()
>
0
);
//should have already constructed some message ids
}
@Test
public
void
testAppendMessageBatchSucc
()
throws
Exception
{
List
<
Message
>
messages
=
new
ArrayList
<>();
String
topic
=
"test-topic"
;
int
queue
=
0
;
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
Message
msg
=
new
Message
();
msg
.
setBody
(
"body"
.
getBytes
());
msg
.
setTopic
(
topic
);
msg
.
setTags
(
"abc"
);
messages
.
add
(
msg
);
}
MessageExtBatch
messageExtBatch
=
new
MessageExtBatch
();
messageExtBatch
.
setTopic
(
topic
);
messageExtBatch
.
setQueueId
(
queue
);
messageExtBatch
.
setBornTimestamp
(
System
.
currentTimeMillis
());
messageExtBatch
.
setBornHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
123
));
messageExtBatch
.
setStoreHost
(
new
InetSocketAddress
(
"127.0.0.1"
,
124
));
messageExtBatch
.
setBody
(
MessageDecoder
.
encodeMessages
(
messages
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
ByteBuffer
buff
=
ByteBuffer
.
allocate
(
1024
*
10
);
AppendMessageResult
allresult
=
callback
.
doAppend
(
0
,
buff
,
1024
*
10
,
messageExtBatch
);
assertEquals
(
AppendMessageStatus
.
PUT_OK
,
allresult
.
getStatus
());
assertEquals
(
0
,
allresult
.
getWroteOffset
());
assertEquals
(
0
,
allresult
.
getLogicsOffset
());
assertEquals
(
buff
.
position
(),
allresult
.
getWroteBytes
());
assertEquals
(
messages
.
size
(),
allresult
.
getMsgNum
());
Set
<
String
>
msgIds
=
new
HashSet
<>();
for
(
String
msgId:
allresult
.
getMsgId
().
split
(
","
))
{
assertEquals
(
32
,
msgId
.
length
());
msgIds
.
add
(
msgId
);
}
assertEquals
(
messages
.
size
(),
msgIds
.
size
());
List
<
MessageExt
>
decodeMsgs
=
MessageDecoder
.
decodes
((
ByteBuffer
)
buff
.
flip
());
assertEquals
(
decodeMsgs
.
size
(),
decodeMsgs
.
size
());
long
queueOffset
=
decodeMsgs
.
get
(
0
).
getQueueOffset
();
long
storeTimeStamp
=
decodeMsgs
.
get
(
0
).
getStoreTimestamp
();
for
(
int
i
=
0
;
i
<
messages
.
size
();
i
++)
{
assertEquals
(
messages
.
get
(
i
).
getTopic
(),
decodeMsgs
.
get
(
i
).
getTopic
());
assertEquals
(
new
String
(
messages
.
get
(
i
).
getBody
()),
new
String
(
decodeMsgs
.
get
(
i
).
getBody
()));
assertEquals
(
messages
.
get
(
i
).
getTags
(),
decodeMsgs
.
get
(
i
).
getTags
());
assertEquals
(
messageExtBatch
.
getBornHostNameString
(),
decodeMsgs
.
get
(
i
).
getBornHostNameString
());
assertEquals
(
messageExtBatch
.
getBornTimestamp
(),
decodeMsgs
.
get
(
i
).
getBornTimestamp
());
assertEquals
(
storeTimeStamp
,
decodeMsgs
.
get
(
i
).
getStoreTimestamp
());
assertEquals
(
queueOffset
++,
decodeMsgs
.
get
(
i
).
getQueueOffset
());
}
}
}
test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
0 → 100644
浏览文件 @
47fad3c1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.test.client.producer.batch
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Random
;
import
org.apache.log4j.Logger
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.test.base.BaseConf
;
import
org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT
;
import
org.apache.rocketmq.test.factory.ProducerFactory
;
import
org.apache.rocketmq.test.util.RandomUtils
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
public
class
BatchSendIT
extends
BaseConf
{
private
static
Logger
logger
=
Logger
.
getLogger
(
TagMessageWith1ConsumerIT
.
class
);
private
String
topic
=
null
;
private
Random
random
=
new
Random
();
@Before
public
void
setUp
()
{
topic
=
initTopic
();
logger
.
info
(
String
.
format
(
"user topic[%s]!"
,
topic
));
}
@After
public
void
tearDown
()
{
super
.
shutDown
();
}
@Test
public
void
testBatchSend_ViewMessage
()
throws
Exception
{
List
<
Message
>
messageList
=
new
ArrayList
<>();
int
batchNum
=
100
;
for
(
int
i
=
0
;
i
<
batchNum
;
i
++)
{
messageList
.
add
(
new
Message
(
topic
,
RandomUtils
.
getStringByUUID
().
getBytes
()));
}
DefaultMQProducer
producer
=
ProducerFactory
.
getRMQProducer
(
nsAddr
);
SendResult
sendResult
=
producer
.
send
(
messageList
);
Assert
.
assertEquals
(
SendStatus
.
SEND_OK
,
sendResult
.
getSendStatus
());
String
[]
offsetIds
=
sendResult
.
getOffsetMsgId
().
split
(
","
);
String
[]
msgIds
=
sendResult
.
getMsgId
().
split
(
","
);
Assert
.
assertEquals
(
messageList
.
size
(),
offsetIds
.
length
);
Assert
.
assertEquals
(
messageList
.
size
(),
msgIds
.
length
);
Thread
.
sleep
(
2000
);
for
(
int
i
=
0
;
i
<
3
;
i
++)
{
producer
.
viewMessage
(
offsetIds
[
random
.
nextInt
(
batchNum
)]);
}
for
(
int
i
=
0
;
i
<
3
;
i
++)
{
producer
.
viewMessage
(
topic
,
msgIds
[
random
.
nextInt
(
batchNum
)]);
}
}
@Test
public
void
testBatchSend_CheckProperties
()
throws
Exception
{
List
<
Message
>
messageList
=
new
ArrayList
<>();
Message
message
=
new
Message
();
message
.
setTopic
(
topic
);
message
.
setKeys
(
"keys123"
);
message
.
setTags
(
"tags123"
);
message
.
setWaitStoreMsgOK
(
false
);
message
.
setBuyerId
(
"buyerid123"
);
message
.
setFlag
(
123
);
message
.
setBody
(
"body"
.
getBytes
());
messageList
.
add
(
message
);
DefaultMQProducer
producer
=
ProducerFactory
.
getRMQProducer
(
nsAddr
);
SendResult
sendResult
=
producer
.
send
(
messageList
);
Assert
.
assertEquals
(
SendStatus
.
SEND_OK
,
sendResult
.
getSendStatus
());
String
[]
offsetIds
=
sendResult
.
getOffsetMsgId
().
split
(
","
);
String
[]
msgIds
=
sendResult
.
getMsgId
().
split
(
","
);
Assert
.
assertEquals
(
messageList
.
size
(),
offsetIds
.
length
);
Assert
.
assertEquals
(
messageList
.
size
(),
msgIds
.
length
);
Thread
.
sleep
(
2000
);
Message
messageByOffset
=
producer
.
viewMessage
(
offsetIds
[
0
]);
Message
messageByMsgId
=
producer
.
viewMessage
(
topic
,
msgIds
[
0
]);
System
.
out
.
println
(
messageByOffset
);
System
.
out
.
println
(
messageByMsgId
);
Assert
.
assertEquals
(
message
.
getTopic
(),
messageByMsgId
.
getTopic
());
Assert
.
assertEquals
(
message
.
getTopic
(),
messageByOffset
.
getTopic
());
Assert
.
assertEquals
(
message
.
getKeys
(),
messageByOffset
.
getKeys
());
Assert
.
assertEquals
(
message
.
getKeys
(),
messageByMsgId
.
getKeys
());
Assert
.
assertEquals
(
message
.
getTags
(),
messageByOffset
.
getTags
());
Assert
.
assertEquals
(
message
.
getTags
(),
messageByMsgId
.
getTags
());
Assert
.
assertEquals
(
message
.
isWaitStoreMsgOK
(),
messageByOffset
.
isWaitStoreMsgOK
());
Assert
.
assertEquals
(
message
.
isWaitStoreMsgOK
(),
messageByMsgId
.
isWaitStoreMsgOK
());
Assert
.
assertEquals
(
message
.
getBuyerId
(),
messageByOffset
.
getBuyerId
());
Assert
.
assertEquals
(
message
.
getBuyerId
(),
messageByMsgId
.
getBuyerId
());
Assert
.
assertEquals
(
message
.
getFlag
(),
messageByOffset
.
getFlag
());
Assert
.
assertEquals
(
message
.
getFlag
(),
messageByMsgId
.
getFlag
());
}
}
test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
浏览文件 @
47fad3c1
...
...
@@ -61,7 +61,7 @@ public class MessageExceptionIT extends BaseConf {
@Test
(
expected
=
org
.
apache
.
rocketmq
.
client
.
exception
.
MQClientException
.
class
)
public
void
testSynSendNullMessage
()
throws
Exception
{
producer
.
send
(
null
);
producer
.
send
(
(
Message
)
null
);
}
@Test
(
expected
=
org
.
apache
.
rocketmq
.
client
.
exception
.
MQClientException
.
class
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录