Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
武汉红喜
whatsmars
提交
a8b88006
W
whatsmars
项目概览
武汉红喜
/
whatsmars
通知
3
Star
0
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
W
whatsmars
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a8b88006
编写于
12月 27, 2018
作者:
武汉红喜
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
RocketMQTemplate方法重载(加入keys)
上级
560e33ac
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
68 addition
and
16 deletion
+68
-16
whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java
...apache/rocketmq/spring/starter/core/RocketMQTemplate.java
+65
-14
whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java
...whatsmars/rocketmq/boot/producer/ProducerApplication.java
+3
-2
未找到文件。
whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java
浏览文件 @
a8b88006
...
...
@@ -20,6 +20,7 @@ package org.apache.rocketmq.spring.starter.core;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
java.nio.charset.Charset
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Objects
;
...
...
@@ -121,12 +122,60 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return
syncSend
(
destination
,
payload
,
producer
.
getSendMsgTimeout
());
}
/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public
SendResult
syncSend
(
String
destination
,
Object
payload
,
long
timeout
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
null
,
null
);
return
syncSend
(
destination
,
message
,
timeout
);
}
/**
* Same to {@link #syncSend(String, Object)}
*/
public
SendResult
syncSend
(
String
destination
,
Object
payload
,
String
keys
)
{
return
syncSend
(
destination
,
payload
,
producer
.
getSendMsgTimeout
(),
keys
);
}
/**
* Same to {@link #syncSend(String, Object, long)}
* keys 用于消息hash索引,方便查询,尽可能唯一
*/
public
SendResult
syncSend
(
String
destination
,
Object
payload
,
long
timeout
,
String
keys
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
keys
);
return
syncSend
(
destination
,
message
,
timeout
);
}
/**
* Same to {@link #sendDelayed(String, Object, MessageDelayLevel)}
*/
public
SendResult
sendDelayed
(
String
destination
,
Object
payload
,
String
keys
,
MessageDelayLevel
delayLevel
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
keys
);
return
sendDelayed
(
destination
,
message
,
producer
.
getSendMsgTimeout
(),
delayLevel
);
}
/**
* Same to {@link #sendDelayed(String, Message, MessageDelayLevel)}
*/
public
SendResult
sendDelayed
(
String
destination
,
Object
payload
,
MessageDelayLevel
delayLevel
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
null
,
null
);
return
sendDelayed
(
destination
,
message
,
producer
.
getSendMsgTimeout
(),
delayLevel
);
}
/**
* Same to {@link #sendDelayed(String, Message, long, MessageDelayLevel)}
*/
public
SendResult
sendDelayed
(
String
destination
,
Message
<?>
message
,
MessageDelayLevel
delayLevel
)
{
return
sendDelayed
(
destination
,
message
,
producer
.
getSendMsgTimeout
(),
delayLevel
);
}
/**
* S
ame to {@link #syncSend(String, Message)} with send timeout specified in addition
.
* S
end delayed message
.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
...
...
@@ -156,19 +205,6 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
}
/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public
SendResult
syncSend
(
String
destination
,
Object
payload
,
long
timeout
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
null
,
null
);
return
syncSend
(
destination
,
message
,
timeout
);
}
/**
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
...
...
@@ -209,6 +245,14 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
}
/**
* Same to {@link #syncSendOrderly(String, Object, String)}
*/
public
SendResult
syncSendOrderly
(
String
destination
,
Object
payload
,
String
keys
,
String
hashKey
)
{
Message
<?>
message
=
this
.
doConvert
(
payload
,
keys
);
return
syncSendOrderly
(
destination
,
message
,
hashKey
);
}
/**
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
...
...
@@ -510,6 +554,13 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return
rocketMsg
;
}
private
Message
<?>
doConvert
(
Object
payload
,
String
keys
)
{
Map
<
String
,
Object
>
headers
=
new
HashMap
<>();
headers
.
put
(
MessageConst
.
PROPERTY_KEYS
,
keys
);
Message
<?>
message
=
this
.
doConvert
(
payload
,
headers
,
null
);
return
message
;
}
@Override
protected
Message
<?>
doConvert
(
Object
payload
,
Map
<
String
,
Object
>
headers
,
MessagePostProcessor
postProcessor
)
{
String
content
;
...
...
whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java
浏览文件 @
a8b88006
...
...
@@ -33,10 +33,11 @@ public class ProducerApplication implements CommandLineRunner {
}
}
rocketMQTemplate
.
send
(
"test-topic-1"
,
MessageBuilder
.
withPayload
(
"Hello, World! I'm from spring message"
).
build
());
rocketMQTemplate
.
syncSend
(
"test-topic-1"
,
"Hello, World! I'm from simple message"
);
rocketMQTemplate
.
convertAndSend
(
"test-topic-2"
,
new
OrderPaidEvent
(
"T_001"
,
new
BigDecimal
(
"88.00"
)));
rocketMQTemplate
.
sendDelayed
(
"test-topic-1"
,
MessageBuilder
.
withPayload
(
"I'm delayed message"
).
build
()
,
MessageDelayLevel
.
TIME_1M
);
rocketMQTemplate
.
sendDelayed
(
"test-topic-1"
,
"I'm delayed message"
,
MessageDelayLevel
.
TIME_1M
);
rocketMQTemplate
.
sendOneWay
(
"test-topic-1"
,
MessageBuilder
.
withPayload
(
"I'm one way message"
).
build
());
rocketMQTemplate
.
syncSendOrderly
(
"test-topic-4"
,
MessageBuilder
.
withPayload
(
"I'm order message"
).
build
()
,
"1234"
);
rocketMQTemplate
.
syncSendOrderly
(
"test-topic-4"
,
"I'm order message"
,
"1234"
);
rocketMQTemplate
.
asyncSend
(
"test-topic-1"
,
MessageBuilder
.
withPayload
(
"I'm async message"
).
build
(),
new
SendCallback
()
{
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录