Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
小五666\n哈哈
Rocketmq
提交
46b2e125
R
Rocketmq
项目概览
小五666\n哈哈
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
46b2e125
编写于
8月 27, 2019
作者:
Q
qqeasonchen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-16]impl rpc support - code format
上级
c6c699eb
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
284 addition
and
178 deletion
+284
-178
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+13
-24
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
...va/org/apache/rocketmq/broker/client/ProducerManager.java
+1
-2
broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
...ache/rocketmq/broker/processor/ReplyMessageProcessor.java
+48
-22
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
.../org/apache/rocketmq/broker/topic/TopicConfigManager.java
+2
-2
client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
...he/rocketmq/client/exception/RequestTimeoutException.java
+18
-2
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
.../apache/rocketmq/client/impl/ClientRemotingProcessor.java
+8
-6
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+6
-6
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+37
-32
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
...rg/apache/rocketmq/client/producer/DefaultMQProducer.java
+24
-48
client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
.../java/org/apache/rocketmq/client/producer/MQProducer.java
+9
-8
client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
.../org/apache/rocketmq/client/producer/RequestCallback.java
+17
-0
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
...g/apache/rocketmq/client/producer/RequestFutureTable.java
+23
-7
client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
...pache/rocketmq/client/producer/RequestResponseFuture.java
+24
-10
client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
...in/java/org/apache/rocketmq/client/utils/MessageUtil.java
+17
-0
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
...rc/main/java/org/apache/rocketmq/common/BrokerConfig.java
+1
-1
common/src/main/java/org/apache/rocketmq/common/MixAll.java
common/src/main/java/org/apache/rocketmq/common/MixAll.java
+1
-7
common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
...tmq/common/protocol/header/ReplyMessageRequestHeader.java
+17
-0
common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java
.../java/org/apache/rocketmq/common/utils/RequestIdUtil.java
+18
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
46b2e125
...
...
@@ -61,8 +61,8 @@ import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import
org.apache.rocketmq.broker.processor.EndTransactionProcessor
;
import
org.apache.rocketmq.broker.processor.PullMessageProcessor
;
import
org.apache.rocketmq.broker.processor.QueryMessageProcessor
;
import
org.apache.rocketmq.broker.processor.SendMessageProcessor
;
import
org.apache.rocketmq.broker.processor.ReplyMessageProcessor
;
import
org.apache.rocketmq.broker.processor.SendMessageProcessor
;
import
org.apache.rocketmq.broker.slave.SlaveSynchronize
;
import
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager
;
import
org.apache.rocketmq.broker.topic.TopicConfigManager
;
...
...
@@ -166,7 +166,7 @@ public class BrokerController {
private
TransactionalMessageService
transactionalMessageService
;
private
AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener
;
private
Future
<?>
slaveSyncFuture
;
private
Map
<
Class
,
AccessValidator
>
accessValidatorMap
=
new
HashMap
<
Class
,
AccessValidator
>();
private
Map
<
Class
,
AccessValidator
>
accessValidatorMap
=
new
HashMap
<
Class
,
AccessValidator
>();
public
BrokerController
(
final
BrokerConfig
brokerConfig
,
...
...
@@ -245,7 +245,7 @@ public class BrokerController {
this
.
brokerConfig
);
if
(
messageStoreConfig
.
isEnableDLegerCommitLog
())
{
DLedgerRoleChangeHandler
roleChangeHandler
=
new
DLedgerRoleChangeHandler
(
this
,
(
DefaultMessageStore
)
messageStore
);
((
DLedgerCommitLog
)((
DefaultMessageStore
)
messageStore
).
getCommitLog
()).
getdLedgerServer
().
getdLedgerLeaderElector
().
addRoleChangeHandler
(
roleChangeHandler
);
((
DLedgerCommitLog
)
((
DefaultMessageStore
)
messageStore
).
getCommitLog
()).
getdLedgerServer
().
getdLedgerLeaderElector
().
addRoleChangeHandler
(
roleChangeHandler
);
}
this
.
brokerStats
=
new
BrokerStats
((
DefaultMessageStore
)
this
.
messageStore
);
//load plugin
...
...
@@ -282,12 +282,12 @@ public class BrokerController {
new
ThreadFactoryImpl
(
"PullMessageThread_"
));
this
.
replyMessageExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
1000
*
60
,
TimeUnit
.
MILLISECONDS
,
this
.
replyThreadPoolQueue
,
new
ThreadFactoryImpl
(
"ProcessReplyMessageThread_"
));
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
1000
*
60
,
TimeUnit
.
MILLISECONDS
,
this
.
replyThreadPoolQueue
,
new
ThreadFactoryImpl
(
"ProcessReplyMessageThread_"
));
this
.
queryMessageExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getQueryMessageThreadPoolNums
(),
...
...
@@ -513,9 +513,9 @@ public class BrokerController {
return
;
}
for
(
AccessValidator
accessValidator:
accessValidators
)
{
for
(
AccessValidator
accessValidator
:
accessValidators
)
{
final
AccessValidator
validator
=
accessValidator
;
accessValidatorMap
.
put
(
validator
.
getClass
(),
validator
);
accessValidatorMap
.
put
(
validator
.
getClass
(),
validator
);
this
.
registerServerRPCHook
(
new
RPCHook
()
{
@Override
...
...
@@ -531,14 +531,13 @@ public class BrokerController {
}
}
private
void
initialRpcHooks
()
{
List
<
RPCHook
>
rpcHooks
=
ServiceProvider
.
load
(
ServiceProvider
.
RPC_HOOK_ID
,
RPCHook
.
class
);
if
(
rpcHooks
==
null
||
rpcHooks
.
isEmpty
())
{
return
;
}
for
(
RPCHook
rpcHook:
rpcHooks
)
{
for
(
RPCHook
rpcHook
:
rpcHooks
)
{
this
.
registerServerRPCHook
(
rpcHook
);
}
}
...
...
@@ -576,7 +575,6 @@ public class BrokerController {
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_REPLY_MESSAGE
,
replyMessageProcessor
,
replyMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_REPLY_MESSAGE_V2
,
replyMessageProcessor
,
replyMessageExecutor
);
/**
* QueryMessageProcessor
*/
...
...
@@ -887,8 +885,6 @@ public class BrokerController {
handleSlaveSynchronize
(
messageStoreConfig
.
getBrokerRole
());
}
this
.
registerBrokerAll
(
true
,
false
,
true
);
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
...
...
@@ -911,7 +907,6 @@ public class BrokerController {
this
.
brokerFastFailure
.
start
();
}
}
public
synchronized
void
registerIncrementBrokerData
(
TopicConfig
topicConfig
,
DataVersion
dataVersion
)
{
...
...
@@ -1125,7 +1120,6 @@ public class BrokerController {
this
.
transactionalMessageCheckListener
=
transactionalMessageCheckListener
;
}
public
BlockingQueue
<
Runnable
>
getEndTransactionThreadPoolQueue
()
{
return
endTransactionThreadPoolQueue
;
...
...
@@ -1146,8 +1140,7 @@ public class BrokerController {
public
void
run
()
{
try
{
BrokerController
.
this
.
slaveSynchronize
.
syncAll
();
}
catch
(
Throwable
e
)
{
}
catch
(
Throwable
e
)
{
log
.
error
(
"ScheduledTask SlaveSynchronize syncAll error."
,
e
);
}
}
...
...
@@ -1193,8 +1186,6 @@ public class BrokerController {
log
.
info
(
"Finish to change to slave brokerName={} brokerId={}"
,
brokerConfig
.
getBrokerName
(),
brokerId
);
}
public
void
changeToMaster
(
BrokerRole
role
)
{
if
(
role
==
BrokerRole
.
SLAVE
)
{
return
;
...
...
@@ -1244,6 +1235,4 @@ public class BrokerController {
}
}
}
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
浏览文件 @
46b2e125
...
...
@@ -17,7 +17,6 @@
package
org.apache.rocketmq.broker.client
;
import
io.netty.channel.Channel
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.Iterator
;
...
...
@@ -28,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.locks.Lock
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.rocketmq.broker.util.PositiveAtomicCounter
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
...
...
@@ -46,6 +44,7 @@ public class ProducerManager {
new
HashMap
<
String
,
HashMap
<
Channel
,
ClientChannelInfo
>>();
private
final
ConcurrentHashMap
<
String
,
Channel
>
clientChannelTable
=
new
ConcurrentHashMap
<>();
private
PositiveAtomicCounter
positiveAtomicCounter
=
new
PositiveAtomicCounter
();
public
ProducerManager
()
{
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageContext
;
import
org.apache.rocketmq.common.*
;
import
org.apache.rocketmq.common.message.*
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader
;
...
...
@@ -16,7 +38,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.*
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
public
class
ReplyMessageProcessor
extends
AbstractSendMessageProcessor
implements
NettyRequestProcessor
{
...
...
@@ -26,7 +49,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
}
@Override
public
RemotingCommand
processRequest
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
public
RemotingCommand
processRequest
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
SendMessageContext
mqtraceContext
=
null
;
SendMessageRequestHeader
requestHeader
=
parseRequestHeader
(
request
);
if
(
requestHeader
==
null
)
{
...
...
@@ -49,13 +73,13 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
switch
(
request
.
getCode
())
{
case
RequestCode
.
SEND_REPLY_MESSAGE_V2
:
requestHeaderV2
=
(
SendMessageRequestHeaderV2
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeaderV2
.
class
);
(
SendMessageRequestHeaderV2
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeaderV2
.
class
);
case
RequestCode
.
SEND_REPLY_MESSAGE
:
if
(
null
==
requestHeaderV2
)
{
requestHeader
=
(
SendMessageRequestHeader
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeader
.
class
);
(
SendMessageRequestHeader
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeader
.
class
);
}
else
{
requestHeader
=
SendMessageRequestHeaderV2
.
createSendMessageRequestHeaderV1
(
requestHeaderV2
);
}
...
...
@@ -66,11 +90,11 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
}
private
RemotingCommand
processReplyMessageRequest
(
final
ChannelHandlerContext
ctx
,
final
RemotingCommand
request
,
final
SendMessageContext
sendMessageContext
,
final
SendMessageRequestHeader
requestHeader
)
{
final
RemotingCommand
request
,
final
SendMessageContext
sendMessageContext
,
final
SendMessageRequestHeader
requestHeader
)
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
SendMessageResponseHeader
.
class
);
final
SendMessageResponseHeader
responseHeader
=
(
SendMessageResponseHeader
)
response
.
readCustomHeader
();
final
SendMessageResponseHeader
responseHeader
=
(
SendMessageResponseHeader
)
response
.
readCustomHeader
();
response
.
setOpaque
(
request
.
getOpaque
());
...
...
@@ -126,7 +150,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
return
response
;
}
private
boolean
pushReplyMessage
(
final
ChannelHandlerContext
ctx
,
final
SendMessageRequestHeader
requestHeader
,
final
Message
msg
,
final
RemotingCommand
response
)
{
private
boolean
pushReplyMessage
(
final
ChannelHandlerContext
ctx
,
final
SendMessageRequestHeader
requestHeader
,
final
Message
msg
,
final
RemotingCommand
response
)
{
ReplyMessageRequestHeader
replyMessageRequestHeader
=
new
ReplyMessageRequestHeader
();
replyMessageRequestHeader
.
setBornHost
(
ctx
.
channel
().
remoteAddress
().
toString
());
replyMessageRequestHeader
.
setStoreHost
(
this
.
getStoreHost
().
toString
());
...
...
@@ -194,12 +219,13 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
}
private
void
handlePutMessageResult
(
PutMessageResult
putMessageResult
,
final
RemotingCommand
response
,
final
RemotingCommand
request
,
final
MessageExt
msg
,
final
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
ChannelHandlerContext
ctx
,
int
queueIdInt
)
{
final
RemotingCommand
request
,
final
MessageExt
msg
,
final
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
ChannelHandlerContext
ctx
,
int
queueIdInt
)
{
if
(
putMessageResult
==
null
)
{
response
.
setRemark
(
"push reply to requester success, but store putMessage return null"
);
return
;
return
;
}
boolean
sendOK
=
false
;
...
...
@@ -231,12 +257,12 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
case
PROPERTIES_SIZE_EXCEEDED:
// response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response
.
setRemark
(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."
);
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."
);
break
;
case
SERVICE_NOT_AVAILABLE:
// response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response
.
setRemark
(
"service not available now, maybe disk full, maybe your broker machine memory too small."
);
"service not available now, maybe disk full, maybe your broker machine memory too small."
);
break
;
case
OS_PAGECACHE_BUSY:
// response.setCode(ResponseCode.SYSTEM_ERROR);
...
...
@@ -256,7 +282,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
if
(
sendOK
)
{
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutNums
(
msg
.
getTopic
(),
putMessageResult
.
getAppendMessageResult
().
getMsgNum
(),
1
);
this
.
brokerController
.
getBrokerStatsManager
().
incTopicPutSize
(
msg
.
getTopic
(),
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
());
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
());
this
.
brokerController
.
getBrokerStatsManager
().
incBrokerPutNums
(
putMessageResult
.
getAppendMessageResult
().
getMsgNum
());
response
.
setRemark
(
null
);
responseHeader
.
setMsgId
(
putMessageResult
.
getAppendMessageResult
().
getMsgId
());
...
...
@@ -270,7 +296,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
int
commercialBaseCount
=
brokerController
.
getBrokerConfig
().
getCommercialBaseCount
();
int
wroteSize
=
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
();
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
)
*
commercialBaseCount
;
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
)
*
commercialBaseCount
;
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_SUCCESS
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
...
...
@@ -280,7 +306,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
}
else
{
if
(
hasSendMessageHook
())
{
int
wroteSize
=
request
.
getBody
().
length
;
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
);
int
incValue
=
(
int
)
Math
.
ceil
(
wroteSize
/
BrokerStatsManager
.
SIZE_PER_COUNT
);
sendMessageContext
.
setCommercialSendStats
(
BrokerStatsManager
.
StatsType
.
SEND_FAILURE
);
sendMessageContext
.
setCommercialSendTimes
(
incValue
);
...
...
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
浏览文件 @
46b2e125
...
...
@@ -34,11 +34,11 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.protocol.body.KVTable
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.sysflag.TopicSysFlag
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
public
class
TopicConfigManager
extends
ConfigManager
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
...
...
client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.exception
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.help.FAQUrl
;
public
class
RequestTimeoutException
extends
Exception
{
private
static
final
long
serialVersionUID
=
-
5758410930844185841L
;
...
...
@@ -16,7 +32,7 @@ public class RequestTimeoutException extends Exception {
public
RequestTimeoutException
(
int
responseCode
,
String
errorMessage
)
{
super
(
"CODE: "
+
UtilAll
.
responseCode2String
(
responseCode
)
+
" DESC: "
+
errorMessage
);
+
errorMessage
);
this
.
responseCode
=
responseCode
;
this
.
errorMessage
=
errorMessage
;
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
浏览文件 @
46b2e125
...
...
@@ -16,13 +16,12 @@
*/
package
org.apache.rocketmq.client.impl
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.io.IOException
;
import
java.net.InetSocketAddress
;
import
java.nio.ByteBuffer
;
import
java.util.HashMap
;
import
java.util.Map
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.client.impl.producer.MQProducerInner
;
...
...
@@ -30,7 +29,11 @@ import org.apache.rocketmq.client.log.ClientLogger;
import
org.apache.rocketmq.client.producer.RequestFutureTable
;
import
org.apache.rocketmq.client.producer.RequestResponseFuture
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.*
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.NamespaceUtil
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
...
...
@@ -43,8 +46,8 @@ import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRe
import
org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
...
...
@@ -52,7 +55,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
public
class
ClientRemotingProcessor
implements
NettyRequestProcessor
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
final
MQClientInstance
mqClientFactory
;
...
...
@@ -221,7 +223,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
}
private
RemotingCommand
receiveReplyMssage
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
RemotingCommand
request
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
long
receiveTime
=
System
.
currentTimeMillis
();
...
...
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
46b2e125
...
...
@@ -27,7 +27,6 @@ import java.util.Map;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.consumer.PullCallback
;
...
...
@@ -304,8 +303,8 @@ public class MQClientAPIImpl {
requestHeader
.
setDefaultGroupPerm
(
plainAccessConfig
.
getDefaultGroupPerm
());
requestHeader
.
setDefaultTopicPerm
(
plainAccessConfig
.
getDefaultTopicPerm
());
requestHeader
.
setWhiteRemoteAddress
(
plainAccessConfig
.
getWhiteRemoteAddress
());
requestHeader
.
setTopicPerms
(
UtilAll
.
List2String
(
plainAccessConfig
.
getTopicPerms
(),
","
));
requestHeader
.
setGroupPerms
(
UtilAll
.
List2String
(
plainAccessConfig
.
getGroupPerms
(),
","
));
requestHeader
.
setTopicPerms
(
UtilAll
.
List2String
(
plainAccessConfig
.
getTopicPerms
(),
","
));
requestHeader
.
setGroupPerms
(
UtilAll
.
List2String
(
plainAccessConfig
.
getGroupPerms
(),
","
));
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
UPDATE_AND_CREATE_ACL_CONFIG
,
requestHeader
);
...
...
@@ -344,7 +343,7 @@ public class MQClientAPIImpl {
throw
new
MQClientException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
void
updateGlobalWhiteAddrsConfig
(
final
String
addr
,
final
String
globalWhiteAddrs
,
final
long
timeoutMillis
)
public
void
updateGlobalWhiteAddrsConfig
(
final
String
addr
,
final
String
globalWhiteAddrs
,
final
long
timeoutMillis
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
UpdateGlobalWhiteAddrsConfigRequestHeader
requestHeader
=
new
UpdateGlobalWhiteAddrsConfigRequestHeader
();
...
...
@@ -366,7 +365,8 @@ public class MQClientAPIImpl {
throw
new
MQClientException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
ClusterAclVersionInfo
getBrokerClusterAclInfo
(
final
String
addr
,
final
long
timeoutMillis
)
throws
RemotingCommandException
,
InterruptedException
,
RemotingTimeoutException
,
public
ClusterAclVersionInfo
getBrokerClusterAclInfo
(
final
String
addr
,
final
long
timeoutMillis
)
throws
RemotingCommandException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQBrokerException
{
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_BROKER_CLUSTER_ACL_INFO
,
null
);
...
...
@@ -389,7 +389,7 @@ public class MQClientAPIImpl {
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
SendResult
sendMessage
(
...
...
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
浏览文件 @
46b2e125
...
...
@@ -103,19 +103,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
new
ConcurrentHashMap
<
String
,
TopicPublishInfo
>();
private
final
ArrayList
<
SendMessageHook
>
sendMessageHookList
=
new
ArrayList
<
SendMessageHook
>();
private
final
RPCHook
rpcHook
;
private
final
BlockingQueue
<
Runnable
>
asyncSenderThreadPoolQueue
;
private
final
ExecutorService
defaultAsyncSenderExecutor
;
private
final
Timer
timer
=
new
Timer
(
"RequestHouseKeepingService"
,
true
);
protected
BlockingQueue
<
Runnable
>
checkRequestQueue
;
protected
ExecutorService
checkExecutor
;
private
ServiceState
serviceState
=
ServiceState
.
CREATE_JUST
;
private
MQClientInstance
mQClientFactory
;
private
ArrayList
<
CheckForbiddenHook
>
checkForbiddenHookList
=
new
ArrayList
<
CheckForbiddenHook
>();
private
int
zipCompressLevel
=
Integer
.
parseInt
(
System
.
getProperty
(
MixAll
.
MESSAGE_COMPRESS_LEVEL
,
"5"
));
private
MQFaultStrategy
mqFaultStrategy
=
new
MQFaultStrategy
();
private
final
BlockingQueue
<
Runnable
>
asyncSenderThreadPoolQueue
;
private
final
ExecutorService
defaultAsyncSenderExecutor
;
private
ExecutorService
asyncSenderExecutor
;
private
final
Timer
timer
=
new
Timer
(
"RequestHouseKeepingService"
,
true
);
public
DefaultMQProducerImpl
(
final
DefaultMQProducer
defaultMQProducer
)
{
this
(
defaultMQProducer
,
null
);
...
...
@@ -291,6 +289,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
*
* @return
*/
@Override
...
...
@@ -484,13 +483,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* DEFAULT ASYNC -------------------------------------------------------
*/
public
void
send
(
Message
msg
,
SendCallback
sendCallback
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
SendCallback
sendCallback
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
send
(
msg
,
sendCallback
,
this
.
defaultMQProducer
.
getSendMsgTimeout
());
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
...
...
@@ -525,7 +525,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
MessageQueue
selectOneMessageQueue
(
final
TopicPublishInfo
tpInfo
,
final
String
lastBrokerName
)
{
return
this
.
mqFaultStrategy
.
selectOneMessageQueue
(
tpInfo
,
lastBrokerName
);
}
...
...
@@ -701,11 +700,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private
SendResult
sendKernelImpl
(
final
Message
msg
,
final
MessageQueue
mq
,
final
CommunicationMode
communicationMode
,
final
SendCallback
sendCallback
,
final
TopicPublishInfo
topicPublishInfo
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
final
MessageQueue
mq
,
final
CommunicationMode
communicationMode
,
final
SendCallback
sendCallback
,
final
TopicPublishInfo
topicPublishInfo
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
long
beginStartTime
=
System
.
currentTimeMillis
();
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
());
if
(
null
==
brokerAddr
)
{
...
...
@@ -1010,8 +1009,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param mq
* @param sendCallback
...
...
@@ -1137,8 +1137,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
...
...
@@ -1149,7 +1150,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws InterruptedException
*/
@Deprecated
public
void
send
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
SendCallback
sendCallback
,
final
long
timeout
)
public
void
send
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
SendCallback
sendCallback
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
final
long
beginStartTime
=
System
.
currentTimeMillis
();
ExecutorService
executor
=
this
.
getAsyncSenderExecutor
();
...
...
@@ -1193,7 +1195,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
TransactionSendResult
sendMessageInTransaction
(
final
Message
msg
,
final
LocalTransactionExecuter
localTransactionExecuter
,
final
Object
arg
)
final
LocalTransactionExecuter
localTransactionExecuter
,
final
Object
arg
)
throws
MQClientException
{
TransactionListener
transactionListener
=
getCheckListener
();
if
(
null
==
localTransactionExecuter
&&
null
==
transactionListener
)
{
...
...
@@ -1330,7 +1332,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return
this
.
sendDefaultImpl
(
msg
,
CommunicationMode
.
SYNC
,
null
,
timeout
);
}
public
Message
request
(
Message
msg
,
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
public
Message
request
(
Message
msg
,
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
prepareSendRequest
(
msg
,
timeout
);
final
String
requestUniqId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
);
...
...
@@ -1356,7 +1359,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if
(
responseMessage
==
null
)
{
if
(
requestResponseFuture
.
isSendReqeustOk
())
{
throw
new
RequestTimeoutException
(
ClientErrorCode
.
REQUEST_TIMEOUT_EXCEPTION
,
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
}
else
{
throw
new
MQClientException
(
"send request message to <"
+
msg
.
getTopic
()
+
"> fail"
,
requestResponseFuture
.
getCause
());
}
...
...
@@ -1394,8 +1397,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
Message
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
prepareSendRequest
(
msg
,
timeout
);
final
String
requestUniqId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
);
...
...
@@ -1421,7 +1424,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if
(
responseMessage
==
null
)
{
if
(
requestResponseFuture
.
isSendReqeustOk
())
{
throw
new
RequestTimeoutException
(
ClientErrorCode
.
REQUEST_TIMEOUT_EXCEPTION
,
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
}
else
{
throw
new
MQClientException
(
"send request message to <"
+
msg
.
getTopic
()
+
"> fail"
,
requestResponseFuture
.
getCause
());
}
...
...
@@ -1433,7 +1436,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
void
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
RemotingException
{
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
RemotingException
{
prepareSendRequest
(
msg
,
timeout
);
final
String
requestUniqId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
);
...
...
@@ -1460,7 +1463,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
Message
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
prepareSendRequest
(
msg
,
timeout
);
final
String
requestUniqId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
);
...
...
@@ -1486,7 +1489,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if
(
responseMessage
==
null
)
{
if
(
requestResponseFuture
.
isSendReqeustOk
())
{
throw
new
RequestTimeoutException
(
ClientErrorCode
.
REQUEST_TIMEOUT_EXCEPTION
,
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
"send request message to <"
+
msg
.
getTopic
()
+
"> OK, but wait reply message timeout, "
+
timeout
+
" ms."
);
}
else
{
throw
new
MQClientException
(
"send request message to <"
+
msg
.
getTopic
()
+
"> fail"
,
requestResponseFuture
.
getCause
());
}
...
...
@@ -1498,7 +1501,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public
void
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
RequestCallback
requestCallback
,
long
timeout
)
throws
RemotingException
{
throws
RemotingException
{
prepareSendRequest
(
msg
,
timeout
);
final
String
requestUniqId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
);
...
...
@@ -1537,7 +1540,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
private
void
prepareSendRequest
(
final
Message
msg
,
long
timeout
){
private
void
prepareSendRequest
(
final
Message
msg
,
long
timeout
)
{
String
requestUniqId
=
RequestIdUtil
.
createUniqueRequestId
();
String
requestClientId
=
this
.
getmQClientFactory
().
getClientId
();
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_REQUEST_UNIQ_ID
,
requestUniqId
);
...
...
@@ -1556,11 +1559,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
private
SendResult
reply
(
final
Message
msg
,
long
timeoutMillis
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
private
SendResult
reply
(
final
Message
msg
,
long
timeoutMillis
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
return
this
.
sendDefaultImpl
(
msg
,
CommunicationMode
.
SYNC
,
null
,
timeoutMillis
);
}
private
SendResult
reply
(
final
Message
msg
,
final
SendCallback
sendCallback
,
long
timeoutMillis
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
private
SendResult
reply
(
final
Message
msg
,
final
SendCallback
sendCallback
,
long
timeoutMillis
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
return
this
.
sendDefaultImpl
(
msg
,
CommunicationMode
.
ASYNC
,
sendCallback
,
timeoutMillis
);
}
...
...
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
浏览文件 @
46b2e125
...
...
@@ -43,38 +43,29 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
/**
* This class is the entry point for applications intending to send messages.
* </p>
* This class is the entry point for applications intending to send messages. </p>
*
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
* box for most scenarios.
* </p>
* box for most scenarios. </p>
*
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding.
* </p>
* cons; you'd better understand strengths and weakness of them before actually coding. </p>
*
* <p>
* <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context.
* </p>
* <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context. </p>
*/
public
class
DefaultMQProducer
extends
ClientConfig
implements
MQProducer
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
protected
final
transient
DefaultMQProducerImpl
defaultMQProducerImpl
;
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
* </p>
* important when transactional messages are involved. </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process.
* </p>
* For non-transactional messages, it does not matter as long as it's unique per process. </p>
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
...
...
@@ -101,16 +92,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
private
int
compressMsgBodyOverHowmuch
=
1024
*
4
;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* </p>
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private
int
retryTimesWhenSendFailed
=
2
;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
* </p>
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
...
...
@@ -269,14 +258,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Start this producer instance.
* </p>
* Start this producer instance. </p>
*
* <strong>
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
* this method before sending or querying messages.
* </strong>
* </p>
* <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
* to invoke this method before sending or querying messages. </strong> </p>
*
* @throws MQClientException if there is any unexpected error.
*/
...
...
@@ -317,8 +302,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* </p>
* Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
...
...
@@ -360,11 +344,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Send message to broker asynchronously.
* </p>
* Send message to broker asynchronously. </p>
*
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* </p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
...
...
@@ -584,7 +566,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
*
* @param msg
* @param timeout
* @return
...
...
@@ -595,13 +576,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
Message
request
(
final
Message
msg
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
RemotingException
,
MQBrokerException
,
InterruptedException
{
msg
.
setTopic
(
withNamespace
(
msg
.
getTopic
()));
return
this
.
defaultMQProducerImpl
.
request
(
msg
,
timeout
);
}
/**
*
* @param msg
* @param requestCallback
* @param timeout
...
...
@@ -612,12 +592,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
void
request
(
final
Message
msg
,
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
,
MQBrokerException
,
RequestTimeoutException
{
throws
MQClientException
,
RemotingException
,
InterruptedException
,
MQBrokerException
,
RequestTimeoutException
{
this
.
defaultMQProducerImpl
.
request
(
msg
,
requestCallback
,
timeout
);
}
/**
*
* @param msg
* @param selector
* @param arg
...
...
@@ -630,13 +609,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
Message
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
return
this
.
defaultMQProducerImpl
.
request
(
msg
,
selector
,
arg
,
timeout
);
}
/**
*
* @param msg
* @param selector
* @param arg
...
...
@@ -649,13 +627,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
void
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
this
.
defaultMQProducerImpl
.
request
(
msg
,
selector
,
arg
,
requestCallback
,
timeout
);
}
/**
*
* @param msg
* @param mq
* @param timeout
...
...
@@ -667,12 +644,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
Message
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
,
RequestTimeoutException
{
return
this
.
defaultMQProducerImpl
.
request
(
msg
,
mq
,
timeout
);
}
/**
*
* @param msg
* @param mq
* @param requestCallback
...
...
@@ -684,7 +660,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public
void
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
RequestCallback
requestCallback
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
throws
MQClientException
,
RemotingException
,
InterruptedException
{
this
.
defaultMQProducerImpl
.
request
(
msg
,
mq
,
requestCallback
,
timeout
);
}
...
...
client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
浏览文件 @
46b2e125
...
...
@@ -102,22 +102,23 @@ public interface MQProducer extends MQAdmin {
//for rpc
Message
request
(
final
Message
msg
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
RemotingException
,
MQBrokerException
,
InterruptedException
;
void
request
(
final
Message
msg
,
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
,
MQBrokerException
;
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
,
MQBrokerException
;
Message
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
void
request
(
final
Message
msg
,
final
MessageQueueSelector
selector
,
final
Object
arg
,
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
;
final
RequestCallback
requestCallback
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
;
Message
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
;
void
request
(
final
Message
msg
,
final
MessageQueue
mq
,
final
RequestCallback
requestCallback
,
long
timeout
)
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
;
throws
RequestTimeoutException
,
MQClientException
,
RemotingException
,
InterruptedException
;
}
client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.producer
;
import
org.apache.rocketmq.common.message.Message
;
...
...
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
浏览文件 @
46b2e125
package
org.apache.rocketmq.client.producer
;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import
org.apache.rocketmq.client.common.ClientErrorCode
;
import
org.apache.rocketmq.client.exception.RequestTimeoutException
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.logging.InternalLogger
;
package
org.apache.rocketmq.client.producer
;
import
java.util.Iterator
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.client.common.ClientErrorCode
;
import
org.apache.rocketmq.client.exception.RequestTimeoutException
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.logging.InternalLogger
;
public
class
RequestFutureTable
{
private
static
InternalLogger
log
=
ClientLogger
.
getLog
();
private
static
ConcurrentHashMap
<
String
,
RequestResponseFuture
>
requestFutureTable
=
new
ConcurrentHashMap
<
String
,
RequestResponseFuture
>();
public
static
ConcurrentHashMap
<
String
,
RequestResponseFuture
>
getRequestFutureTable
(){
public
static
ConcurrentHashMap
<
String
,
RequestResponseFuture
>
getRequestFutureTable
()
{
return
requestFutureTable
;
}
public
static
void
scanExpiredRequest
(){
public
static
void
scanExpiredRequest
()
{
final
List
<
RequestResponseFuture
>
rfList
=
new
LinkedList
<
RequestResponseFuture
>();
Iterator
<
Map
.
Entry
<
String
,
RequestResponseFuture
>>
it
=
requestFutureTable
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
...
...
client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
浏览文件 @
46b2e125
package
org.apache.rocketmq.client.producer
;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import
org.apache.rocketmq.common.message.Message
;
package
org.apache.rocketmq.client.producer
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
org.apache.rocketmq.common.message.Message
;
public
class
RequestResponseFuture
{
private
final
String
requestUniqId
;
private
long
timeoutMillis
;
private
final
RequestCallback
requestCallback
;
private
final
long
beginTimestamp
=
System
.
currentTimeMillis
();
private
final
Message
requestMsg
=
null
;
private
long
timeoutMillis
;
private
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
private
AtomicBoolean
ececuteCallbackOnlyOnce
=
new
AtomicBoolean
(
false
);
private
volatile
Message
responseMsg
=
null
;
private
volatile
boolean
sendReqeustOk
=
true
;
private
volatile
Throwable
cause
=
null
;
private
final
Message
requestMsg
=
null
;
public
RequestResponseFuture
(
String
requestUniqId
,
long
timeoutMillis
,
RequestCallback
requestCallback
){
public
RequestResponseFuture
(
String
requestUniqId
,
long
timeoutMillis
,
RequestCallback
requestCallback
)
{
this
.
requestUniqId
=
requestUniqId
;
this
.
timeoutMillis
=
timeoutMillis
;
this
.
requestCallback
=
requestCallback
;
}
public
void
executeRequestCallback
(){
public
void
executeRequestCallback
()
{
if
(
requestCallback
!=
null
)
{
if
(
sendReqeustOk
&&
cause
==
null
)
{
requestCallback
.
onSuccess
(
responseMsg
);
...
...
@@ -36,7 +50,7 @@ public class RequestResponseFuture {
}
}
public
boolean
isTimeout
(){
public
boolean
isTimeout
()
{
long
diff
=
System
.
currentTimeMillis
()
-
this
.
beginTimestamp
;
return
diff
>
this
.
timeoutMillis
;
}
...
...
@@ -46,7 +60,7 @@ public class RequestResponseFuture {
return
this
.
responseMsg
;
}
public
void
putResponseMessage
(
final
Message
responseMsg
){
public
void
putResponseMessage
(
final
Message
responseMsg
)
{
this
.
responseMsg
=
responseMsg
;
this
.
countDownLatch
.
countDown
();
}
...
...
client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.utils
;
import
org.apache.rocketmq.common.MixAll
;
...
...
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
浏览文件 @
46b2e125
...
...
@@ -769,7 +769,7 @@ public class BrokerConfig {
public
void
setMsgTraceTopicName
(
String
msgTraceTopicName
)
{
this
.
msgTraceTopicName
=
msgTraceTopicName
;
}
public
boolean
isTraceTopicEnable
()
{
return
traceTopicEnable
;
}
...
...
common/src/main/java/org/apache/rocketmq/common/MixAll.java
浏览文件 @
46b2e125
...
...
@@ -45,8 +45,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
public
class
MixAll
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
static
final
String
ROCKETMQ_HOME_ENV
=
"ROCKETMQ_HOME"
;
public
static
final
String
ROCKETMQ_HOME_PROPERTY
=
"rocketmq.home.dir"
;
public
static
final
String
NAMESRV_ADDR_ENV
=
"NAMESRV_ADDR"
;
...
...
@@ -74,30 +72,26 @@ public class MixAll {
public
static
final
String
CID_ONSAPI_OWNER_GROUP
=
"CID_ONSAPI_OWNER"
;
public
static
final
String
CID_ONSAPI_PULL_GROUP
=
"CID_ONSAPI_PULL"
;
public
static
final
String
CID_RMQ_SYS_PREFIX
=
"CID_RMQ_SYS_"
;
public
static
final
List
<
String
>
LOCAL_INET_ADDRESS
=
getLocalInetAddress
();
public
static
final
String
LOCALHOST
=
localhost
();
public
static
final
String
DEFAULT_CHARSET
=
"UTF-8"
;
public
static
final
long
MASTER_ID
=
0L
;
public
static
final
long
CURRENT_JVM_PID
=
getPID
();
public
static
final
String
RETRY_GROUP_TOPIC_PREFIX
=
"%RETRY%"
;
public
static
final
String
DLQ_GROUP_TOPIC_PREFIX
=
"%DLQ%"
;
public
static
final
String
REPLY_TOPIC_POSTFIX
=
"REPLY_TOPIC"
;
public
static
final
String
SYSTEM_TOPIC_PREFIX
=
"rmq_sys_"
;
public
static
final
String
UNIQUE_MSG_QUERY_FLAG
=
"_UNIQUE_KEY_QUERY"
;
public
static
final
String
DEFAULT_TRACE_REGION_ID
=
"DefaultRegion"
;
public
static
final
String
CONSUME_CONTEXT_TYPE
=
"ConsumeContextType"
;
public
static
final
String
RMQ_SYS_TRANS_HALF_TOPIC
=
"RMQ_SYS_TRANS_HALF_TOPIC"
;
public
static
final
String
RMQ_SYS_TRACE_TOPIC
=
"RMQ_SYS_TRACE_TOPIC"
;
public
static
final
String
RMQ_SYS_TRANS_OP_HALF_TOPIC
=
"RMQ_SYS_TRANS_OP_HALF_TOPIC"
;
public
static
final
String
TRANS_CHECK_MAX_TIME_TOPIC
=
"TRANS_CHECK_MAX_TIME_TOPIC"
;
public
static
final
String
CID_SYS_RMQ_TRANS
=
"CID_RMQ_SYS_TRANS"
;
public
static
final
String
ACL_CONF_TOOLS_FILE
=
"/conf/tools.yml"
;
public
static
final
String
REPLY_MESSAGE_FLAG
=
"reply"
;
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
static
String
getWSAddr
()
{
String
wsDomainName
=
System
.
getProperty
(
"rocketmq.namesrv.domain"
,
DEFAULT_NAMESRV_ADDR_LOOKUP
);
...
...
common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.protocol.header
;
import
org.apache.rocketmq.remoting.CommandCustomHeader
;
...
...
common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java
浏览文件 @
46b2e125
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.utils
;
import
java.util.UUID
;
public
class
RequestIdUtil
{
public
static
String
createUniqueRequestId
(){
public
static
String
createUniqueRequestId
()
{
return
UUID
.
randomUUID
().
toString
();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录