Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
f16fd08c
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
f16fd08c
编写于
10月 16, 2019
作者:
Q
qqeasonchen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
rename REPLY_TO to REPLY_TO_CLIENT
上级
9800afb0
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
24 addition
and
14 deletion
+24
-14
broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
...ache/rocketmq/broker/processor/ReplyMessageProcessor.java
+3
-3
broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
.../rocketmq/broker/processor/ReplyMessageProcessorTest.java
+1
-3
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+1
-1
client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
...in/java/org/apache/rocketmq/client/utils/MessageUtil.java
+6
-2
client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
...va/org/apache/rocketmq/client/utils/MessageUtilsTest.java
+10
-2
common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
...java/org/apache/rocketmq/common/message/MessageConst.java
+2
-2
example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
...ava/org/apache/rocketmq/example/rpc/ResponseConsumer.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
浏览文件 @
f16fd08c
...
@@ -173,7 +173,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
...
@@ -173,7 +173,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
PUSH_REPLY_MESSAGE_TO_CLIENT
,
replyMessageRequestHeader
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
PUSH_REPLY_MESSAGE_TO_CLIENT
,
replyMessageRequestHeader
);
request
.
setBody
(
msg
.
getBody
());
request
.
setBody
(
msg
.
getBody
());
String
senderId
=
msg
.
getProperties
().
get
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
);
String
senderId
=
msg
.
getProperties
().
get
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
);
PushReplyResult
pushReplyResult
=
new
PushReplyResult
(
false
);
PushReplyResult
pushReplyResult
=
new
PushReplyResult
(
false
);
if
(
senderId
!=
null
)
{
if
(
senderId
!=
null
)
{
...
@@ -207,9 +207,9 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
...
@@ -207,9 +207,9 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
log
.
warn
(
pushReplyResult
.
getRemark
());
log
.
warn
(
pushReplyResult
.
getRemark
());
}
}
}
else
{
}
else
{
log
.
warn
(
"REPLY_TO
is null, can not reply message"
);
log
.
warn
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO_CLIENT
+
"
is null, can not reply message"
);
pushReplyResult
.
setPushOk
(
false
);
pushReplyResult
.
setPushOk
(
false
);
pushReplyResult
.
setRemark
(
"reply message properties["
+
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
+
"] is null"
);
pushReplyResult
.
setRemark
(
"reply message properties["
+
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
+
"] is null"
);
}
}
return
pushReplyResult
;
return
pushReplyResult
;
}
}
...
...
broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
浏览文件 @
f16fd08c
...
@@ -30,7 +30,6 @@ import org.apache.rocketmq.common.BrokerConfig;
...
@@ -30,7 +30,6 @@ import org.apache.rocketmq.common.BrokerConfig;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
...
@@ -58,7 +57,6 @@ import org.mockito.junit.MockitoJUnitRunner;
...
@@ -58,7 +57,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
import
static
org
.
mockito
.
Mockito
.
when
;
...
@@ -126,7 +124,7 @@ public class ReplyMessageProcessorTest {
...
@@ -126,7 +124,7 @@ public class ReplyMessageProcessorTest {
requestHeader
.
setFlag
(
124
);
requestHeader
.
setFlag
(
124
);
requestHeader
.
setReconsumeTimes
(
0
);
requestHeader
.
setReconsumeTimes
(
0
);
Map
<
String
,
String
>
map
=
new
HashMap
<
String
,
String
>();
Map
<
String
,
String
>
map
=
new
HashMap
<
String
,
String
>();
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
,
"127.0.0.1"
);
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
,
"127.0.0.1"
);
requestHeader
.
setProperties
(
MessageDecoder
.
messageProperties2String
(
map
));
requestHeader
.
setProperties
(
MessageDecoder
.
messageProperties2String
(
map
));
return
requestHeader
;
return
requestHeader
;
}
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
浏览文件 @
f16fd08c
...
@@ -1548,7 +1548,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
...
@@ -1548,7 +1548,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String
correlationId
=
CorrelationIdUtil
.
createCorrelationId
();
String
correlationId
=
CorrelationIdUtil
.
createCorrelationId
();
String
requestClientId
=
this
.
getmQClientFactory
().
getClientId
();
String
requestClientId
=
this
.
getmQClientFactory
().
getClientId
();
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_CORRELATION_ID
,
correlationId
);
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_CORRELATION_ID
,
correlationId
);
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
,
requestClientId
);
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
,
requestClientId
);
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_MESSAGE_TTL
,
String
.
valueOf
(
timeout
));
MessageAccessor
.
putProperty
(
msg
,
MessageConst
.
PROPERTY_MESSAGE_TTL
,
String
.
valueOf
(
timeout
));
boolean
hasRouteData
=
this
.
getmQClientFactory
().
getTopicRouteTable
().
containsKey
(
msg
.
getTopic
());
boolean
hasRouteData
=
this
.
getmQClientFactory
().
getTopicRouteTable
().
containsKey
(
msg
.
getTopic
());
...
...
client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
浏览文件 @
f16fd08c
...
@@ -29,7 +29,7 @@ public class MessageUtil {
...
@@ -29,7 +29,7 @@ public class MessageUtil {
if
(
requestMessage
!=
null
)
{
if
(
requestMessage
!=
null
)
{
Message
replyMessage
=
new
Message
();
Message
replyMessage
=
new
Message
();
String
cluster
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_CLUSTER
);
String
cluster
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_CLUSTER
);
String
replyTo
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
);
String
replyTo
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
);
String
correlationId
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_CORRELATION_ID
);
String
correlationId
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_CORRELATION_ID
);
String
ttl
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_TTL
);
String
ttl
=
requestMessage
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_TTL
);
replyMessage
.
setBody
(
body
);
replyMessage
.
setBody
(
body
);
...
@@ -38,7 +38,7 @@ public class MessageUtil {
...
@@ -38,7 +38,7 @@ public class MessageUtil {
replyMessage
.
setTopic
(
replyTopic
);
replyMessage
.
setTopic
(
replyTopic
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_TYPE
,
MixAll
.
REPLY_MESSAGE_FLAG
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_TYPE
,
MixAll
.
REPLY_MESSAGE_FLAG
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_CORRELATION_ID
,
correlationId
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_CORRELATION_ID
,
correlationId
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
,
replyTo
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
,
replyTo
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_TTL
,
ttl
);
MessageAccessor
.
putProperty
(
replyMessage
,
MessageConst
.
PROPERTY_MESSAGE_TTL
,
ttl
);
return
replyMessage
;
return
replyMessage
;
...
@@ -48,4 +48,8 @@ public class MessageUtil {
...
@@ -48,4 +48,8 @@ public class MessageUtil {
}
}
throw
new
MQClientException
(
ClientErrorCode
.
CREATE_REPLY_MESSAGE_EXCEPTION
,
"create reply message fail, requestMessage cannot be null."
);
throw
new
MQClientException
(
ClientErrorCode
.
CREATE_REPLY_MESSAGE_EXCEPTION
,
"create reply message fail, requestMessage cannot be null."
);
}
}
public
static
String
getReplyToClient
(
final
Message
msg
)
{
return
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO_CLIENT
);
}
}
}
client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
浏览文件 @
f16fd08c
...
@@ -35,7 +35,7 @@ public class MessageUtilsTest {
...
@@ -35,7 +35,7 @@ public class MessageUtilsTest {
public
void
testCreateReplyMessage
()
throws
MQClientException
{
public
void
testCreateReplyMessage
()
throws
MQClientException
{
Message
msg
=
MessageUtil
.
createReplyMessage
(
createReplyMessage
(
"clusterName"
),
new
byte
[]
{
'a'
});
Message
msg
=
MessageUtil
.
createReplyMessage
(
createReplyMessage
(
"clusterName"
),
new
byte
[]
{
'a'
});
assertThat
(
msg
.
getTopic
()).
isEqualTo
(
"clusterName"
+
"_"
+
MixAll
.
REPLY_TOPIC_POSTFIX
);
assertThat
(
msg
.
getTopic
()).
isEqualTo
(
"clusterName"
+
"_"
+
MixAll
.
REPLY_TOPIC_POSTFIX
);
assertThat
(
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
)).
isEqualTo
(
"127.0.0.1"
);
assertThat
(
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
)).
isEqualTo
(
"127.0.0.1"
);
assertThat
(
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_TTL
)).
isEqualTo
(
"3000"
);
assertThat
(
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_TTL
)).
isEqualTo
(
"3000"
);
}
}
...
@@ -59,10 +59,18 @@ public class MessageUtilsTest {
...
@@ -59,10 +59,18 @@ public class MessageUtilsTest {
}
}
}
}
@Test
public
void
testGetReplyToClient
()
throws
MQClientException
{
Message
msg
=
createReplyMessage
(
"clusterName"
);
String
replyToClient
=
MessageUtil
.
getReplyToClient
(
msg
);
assertThat
(
replyToClient
).
isNotNull
();
assertThat
(
replyToClient
).
isEqualTo
(
"127.0.0.1"
);
}
private
Message
createReplyMessage
(
String
clusterName
)
{
private
Message
createReplyMessage
(
String
clusterName
)
{
Message
requestMessage
=
new
Message
();
Message
requestMessage
=
new
Message
();
Map
map
=
new
HashMap
<
String
,
String
>();
Map
map
=
new
HashMap
<
String
,
String
>();
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
,
"127.0.0.1"
);
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
,
"127.0.0.1"
);
map
.
put
(
MessageConst
.
PROPERTY_CLUSTER
,
clusterName
);
map
.
put
(
MessageConst
.
PROPERTY_CLUSTER
,
clusterName
);
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_TTL
,
"3000"
);
map
.
put
(
MessageConst
.
PROPERTY_MESSAGE_TTL
,
"3000"
);
MessageAccessor
.
setProperties
(
requestMessage
,
map
);
MessageAccessor
.
setProperties
(
requestMessage
,
map
);
...
...
common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
浏览文件 @
f16fd08c
...
@@ -46,7 +46,7 @@ public class MessageConst {
...
@@ -46,7 +46,7 @@ public class MessageConst {
public
static
final
String
PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS
=
"CHECK_IMMUNITY_TIME_IN_SECONDS"
;
public
static
final
String
PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS
=
"CHECK_IMMUNITY_TIME_IN_SECONDS"
;
public
static
final
String
PROPERTY_INSTANCE_ID
=
"INSTANCE_ID"
;
public
static
final
String
PROPERTY_INSTANCE_ID
=
"INSTANCE_ID"
;
public
static
final
String
PROPERTY_CORRELATION_ID
=
"CORRELATION_ID"
;
public
static
final
String
PROPERTY_CORRELATION_ID
=
"CORRELATION_ID"
;
public
static
final
String
PROPERTY_MESSAGE_REPLY_TO
=
"REPLY_TO
"
;
public
static
final
String
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
=
"REPLY_TO_CLIENT
"
;
public
static
final
String
PROPERTY_MESSAGE_TTL
=
"TTL"
;
public
static
final
String
PROPERTY_MESSAGE_TTL
=
"TTL"
;
public
static
final
String
PROPERTY_REPLY_MESSAGE_ARRIVE_TIME
=
"ARRIVE_TIME"
;
public
static
final
String
PROPERTY_REPLY_MESSAGE_ARRIVE_TIME
=
"ARRIVE_TIME"
;
public
static
final
String
PROPERTY_PUSH_REPLY_TIME
=
"PUSH_REPLY_TIME"
;
public
static
final
String
PROPERTY_PUSH_REPLY_TIME
=
"PUSH_REPLY_TIME"
;
...
@@ -82,7 +82,7 @@ public class MessageConst {
...
@@ -82,7 +82,7 @@ public class MessageConst {
STRING_HASH_SET
.
add
(
PROPERTY_CONSUME_START_TIMESTAMP
);
STRING_HASH_SET
.
add
(
PROPERTY_CONSUME_START_TIMESTAMP
);
STRING_HASH_SET
.
add
(
PROPERTY_INSTANCE_ID
);
STRING_HASH_SET
.
add
(
PROPERTY_INSTANCE_ID
);
STRING_HASH_SET
.
add
(
PROPERTY_CORRELATION_ID
);
STRING_HASH_SET
.
add
(
PROPERTY_CORRELATION_ID
);
STRING_HASH_SET
.
add
(
PROPERTY_MESSAGE_REPLY_TO
);
STRING_HASH_SET
.
add
(
PROPERTY_MESSAGE_REPLY_TO
_CLIENT
);
STRING_HASH_SET
.
add
(
PROPERTY_MESSAGE_TTL
);
STRING_HASH_SET
.
add
(
PROPERTY_MESSAGE_TTL
);
STRING_HASH_SET
.
add
(
PROPERTY_REPLY_MESSAGE_ARRIVE_TIME
);
STRING_HASH_SET
.
add
(
PROPERTY_REPLY_MESSAGE_ARRIVE_TIME
);
STRING_HASH_SET
.
add
(
PROPERTY_PUSH_REPLY_TIME
);
STRING_HASH_SET
.
add
(
PROPERTY_PUSH_REPLY_TIME
);
...
...
example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
浏览文件 @
f16fd08c
...
@@ -57,7 +57,7 @@ public class ResponseConsumer {
...
@@ -57,7 +57,7 @@ public class ResponseConsumer {
for
(
MessageExt
msg
:
msgs
)
{
for
(
MessageExt
msg
:
msgs
)
{
try
{
try
{
System
.
out
.
printf
(
"handle message: %s"
,
msg
.
toString
());
System
.
out
.
printf
(
"handle message: %s"
,
msg
.
toString
());
String
replyTo
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_MESSAGE_REPLY_TO
);
String
replyTo
=
MessageUtil
.
getReplyToClient
(
msg
);
byte
[]
replyContent
=
"reply message contents."
.
getBytes
();
byte
[]
replyContent
=
"reply message contents."
.
getBytes
();
// create reply message with given util, do not create reply message by yourself
// create reply message with given util, do not create reply message by yourself
Message
replyMessage
=
MessageUtil
.
createReplyMessage
(
msg
,
replyContent
);
Message
replyMessage
=
MessageUtil
.
createReplyMessage
(
msg
,
replyContent
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录