Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
小五666\n哈哈
Rocketmq
提交
7fcf2f1d
R
Rocketmq
项目概览
小五666\n哈哈
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
7fcf2f1d
编写于
1月 11, 2017
作者:
Z
Zhanhui Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add javadoc to DefaultMQProducer
上级
6baa2ed5
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
198 addition
and
0 deletion
+198
-0
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
...rg/apache/rocketmq/client/producer/DefaultMQProducer.java
+198
-0
未找到文件。
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
浏览文件 @
7fcf2f1d
...
...
@@ -31,92 +31,290 @@ import org.apache.rocketmq.common.message.MessageQueue;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
/**
* This class is the entry point for applications intending to send messages.
* </p>
*
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
* box for most scenarios.
* </p>
*
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding.
* </p>
*
* <p>
* <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context.
* </p>
*/
public
class
DefaultMQProducer
extends
ClientConfig
implements
MQProducer
{
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
protected
final
transient
DefaultMQProducerImpl
defaultMQProducerImpl
;
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
* </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process.
* </p>
*
* See {@linktourl http://rocketmq.incubator.apache.org/docs/core-concept/} for more discussion.
*/
private
String
producerGroup
;
/**
* Just for testing or demo program
*/
private
String
createTopicKey
=
MixAll
.
DEFAULT_TOPIC
;
/**
* Number of queues to create per default topic.
*/
private
volatile
int
defaultTopicQueueNums
=
4
;
/**
* Timeout for sending messages.
*/
private
int
sendMsgTimeout
=
3000
;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private
int
compressMsgBodyOverHowmuch
=
1024
*
4
;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private
int
retryTimesWhenSendFailed
=
2
;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private
int
retryTimesWhenSendAsyncFailed
=
2
;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private
boolean
retryAnotherBrokerWhenNotStoreOK
=
false
;
/**
* Maximum allowed message size in bytes.
*/
private
int
maxMessageSize
=
1024
*
1024
*
4
;
// 4M
/**
* Default constructor.
*/
public
DefaultMQProducer
()
{
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
null
);
}
/**
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public
DefaultMQProducer
(
final
String
producerGroup
,
RPCHook
rpcHook
)
{
this
.
producerGroup
=
producerGroup
;
defaultMQProducerImpl
=
new
DefaultMQProducerImpl
(
this
,
rpcHook
);
}
/**
* Constructor specifying producer group.
* @param producerGroup Producer group, see the name-sake field.
*/
public
DefaultMQProducer
(
final
String
producerGroup
)
{
this
(
producerGroup
,
null
);
}
/**
* Constructor specifying the RPC hook.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public
DefaultMQProducer
(
RPCHook
rpcHook
)
{
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
rpcHook
);
}
/**
* Start this producer instance.
* </p>
*
* <strong>
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
* this method before sending or querying messages.
* </strong>
* </p>
*
* @throws MQClientException if there is any unexpected error.
*/
@Override
public
void
start
()
throws
MQClientException
{
this
.
defaultMQProducerImpl
.
start
();
}
/**
* This method shuts down this producer instance and releases related resources.
*/
@Override
public
void
shutdown
()
{
this
.
defaultMQProducerImpl
.
shutdown
();
}
/**
* Fetch message queues of topic <code>topic</code>, to which we may send/publish messages.
* @param topic Topic to fetch.
* @return List of message queues readily to send messages to
* @throws MQClientException if there is any client error.
*/
@Override
public
List
<
MessageQueue
>
fetchPublishMessageQueues
(
String
topic
)
throws
MQClientException
{
return
this
.
defaultMQProducerImpl
.
fetchPublishMessageQueues
(
topic
);
}
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
SendResult
send
(
Message
msg
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
msg
);
}
/**
* Same to {@link #send(Message)} with send timeout specified in addition.
* @param msg Message to send.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
SendResult
send
(
Message
msg
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
msg
,
timeout
);
}
/**
* Send message to broker asynchronously.
* </p>
*
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* </p>
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to
* {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication
* and application developers are the one to resolve this potential issue.
* @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
void
send
(
Message
msg
,
SendCallback
sendCallback
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
this
.
defaultMQProducerImpl
.
send
(
msg
,
sendCallback
);
}
/**
* Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
* @param msg message to send.
* @param sendCallback Callback to execute.
* @param timeout send timeout.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
void
send
(
Message
msg
,
SendCallback
sendCallback
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
this
.
defaultMQProducerImpl
.
send
(
msg
,
sendCallback
,
timeout
);
}
/**
* Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
* @param msg Message to send.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
void
sendOneway
(
Message
msg
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
this
.
defaultMQProducerImpl
.
sendOneway
(
msg
);
}
/**
* Same to {@link #send(Message)} with target message queue specified in addition.
* @param msg Message to send.
* @param mq Target message queue.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
SendResult
send
(
Message
msg
,
MessageQueue
mq
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
msg
,
mq
);
}
/**
* Same to {@link #send(Message)} with target message queue and send timeout specified.
*
* @param msg Message to send.
* @param mq Target message queue.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
SendResult
send
(
Message
msg
,
MessageQueue
mq
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
defaultMQProducerImpl
.
send
(
msg
,
mq
,
timeout
);
}
/**
* Same to {@link #send(Message, SendCallback)} with target message queue specified.
*
* @param msg Message to send.
* @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public
void
send
(
Message
msg
,
MessageQueue
mq
,
SendCallback
sendCallback
)
throws
MQClientException
,
RemotingException
,
InterruptedException
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录