提交 8488207c 编写于 作者: H huangying

[#1540] format checkstyle

上级 1b012a3c
...@@ -34,7 +34,6 @@ import java.util.concurrent.ThreadFactory; ...@@ -34,7 +34,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -89,1288 +88,1288 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; ...@@ -89,1288 +88,1288 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
public class DefaultMQProducerImpl implements MQProducerInner { public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final Random random = new Random(); private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer; private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>(); new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook; private final RPCHook rpcHook;
protected BlockingQueue<Runnable> checkRequestQueue; protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor; protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST; private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory; private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor; private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor; private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null); this(defaultMQProducer, null);
} }
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer; this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue, this.asyncSenderThreadPoolQueue,
new ThreadFactory() { new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
} }
}); });
} }
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
this.checkForbiddenHookList.add(checkForbiddenHook); this.checkForbiddenHookList.add(checkForbiddenHook);
log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
checkForbiddenHookList.size()); checkForbiddenHookList.size());
} }
public void initTransactionEnv() { public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) { if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService(); this.checkExecutor = producer.getExecutorService();
} else { } else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor( this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(), producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(), producer.getCheckThreadPoolMaxSize(),
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
this.checkRequestQueue); this.checkRequestQueue);
} }
} }
public void destroyTransactionEnv() { public void destroyTransactionEnv() {
if (this.checkExecutor != null) { if (this.checkExecutor != null) {
this.checkExecutor.shutdown(); this.checkExecutor.shutdown();
} }
} }
public void registerSendMessageHook(final SendMessageHook hook) { public void registerSendMessageHook(final SendMessageHook hook) {
this.sendMessageHookList.add(hook); this.sendMessageHookList.add(hook);
log.info("register sendMessage Hook, {}", hook.hookName()); log.info("register sendMessage Hook, {}", hook.hookName());
} }
public void start() throws MQClientException { public void start() throws MQClientException {
this.start(true); this.start(true);
} }
public void start(final boolean startFactory) throws MQClientException { public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) { switch (this.serviceState) {
case CREATE_JUST: case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED; this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID(); this.defaultMQProducer.changeInstanceNameToPID();
} }
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) { if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST; this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null); null);
} }
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) { if (startFactory) {
mQClientFactory.start(); mQClientFactory.start();
} }
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel()); this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING; this.serviceState = ServiceState.RUNNING;
break; break;
case RUNNING: case RUNNING:
case START_FAILED: case START_FAILED:
case SHUTDOWN_ALREADY: case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, " throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null); null);
default: default:
break; break;
} }
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
} }
private void checkConfig() throws MQClientException { private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) { if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null); throw new MQClientException("producerGroup is null", null);
} }
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null); null);
} }
} }
public void shutdown() { public void shutdown() {
this.shutdown(true); this.shutdown(true);
} }
public void shutdown(final boolean shutdownFactory) { public void shutdown(final boolean shutdownFactory) {
switch (this.serviceState) { switch (this.serviceState) {
case CREATE_JUST: case CREATE_JUST:
break; break;
case RUNNING: case RUNNING:
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown(); this.defaultAsyncSenderExecutor.shutdown();
if (shutdownFactory) { if (shutdownFactory) {
this.mQClientFactory.shutdown(); this.mQClientFactory.shutdown();
} }
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY; this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break; break;
case SHUTDOWN_ALREADY: case SHUTDOWN_ALREADY:
break; break;
default: default:
break; break;
} }
} }
@Override @Override
public Set<String> getPublishTopicList() { public Set<String> getPublishTopicList() {
Set<String> topicList = new HashSet<String>(); Set<String> topicList = new HashSet<String>();
for (String key : this.topicPublishInfoTable.keySet()) { for (String key : this.topicPublishInfoTable.keySet()) {
topicList.add(key); topicList.add(key);
} }
return topicList; return topicList;
} }
@Override @Override
public boolean isPublishTopicNeedUpdate(String topic) { public boolean isPublishTopicNeedUpdate(String topic) {
TopicPublishInfo prev = this.topicPublishInfoTable.get(topic); TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
return null == prev || !prev.ok(); return null == prev || !prev.ok();
} }
/** /**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended. * This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
* *
* @return * @return
*/ */
@Override @Override
@Deprecated @Deprecated
public TransactionCheckListener checkListener() { public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) { if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionCheckListener(); return producer.getTransactionCheckListener();
} }
return null; return null;
} }
@Override @Override
public TransactionListener getCheckListener() { public TransactionListener getCheckListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) { if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener(); return producer.getTransactionListener();
} }
return null; return null;
} }
@Override @Override
public void checkTransactionState(final String addr, final MessageExt msg, public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) { final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() { Runnable request = new Runnable() {
private final String brokerAddr = addr; private final String brokerAddr = addr;
private final MessageExt message = msg; private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override @Override
public void run() { public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener(); TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) { if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null; Throwable exception = null;
try { try {
if (transactionCheckListener != null) { if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message); localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) { } else if (transactionListener != null) {
log.debug("Used new check API in transaction message"); log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message); localTransactionState = transactionListener.checkLocalTransaction(message);
} else { } else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e; exception = e;
} }
this.processTransactionState( this.processTransactionState(
localTransactionState, localTransactionState,
group, group,
exception); exception);
} else { } else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
} }
} }
private void processTransactionState( private void processTransactionState(
final LocalTransactionState localTransactionState, final LocalTransactionState localTransactionState,
final String producerGroup, final String producerGroup,
final Throwable exception) { final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup); thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true); thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) { if (uniqueKey == null) {
uniqueKey = message.getMsgId(); uniqueKey = message.getMsgId();
} }
thisHeader.setMsgId(uniqueKey); thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) { switch (localTransactionState) {
case COMMIT_MESSAGE: case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break; break;
case ROLLBACK_MESSAGE: case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader); log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break; break;
case UNKNOW: case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader); log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break; break;
default: default:
break; break;
} }
String remark = null; String remark = null;
if (exception != null) { if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
} }
try { try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000); 3000);
} catch (Exception e) { } catch (Exception e) {
log.error("endTransactionOneway exception", e); log.error("endTransactionOneway exception", e);
} }
} }
}; };
this.checkExecutor.submit(request); this.checkExecutor.submit(request);
} }
@Override @Override
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) { public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
if (info != null && topic != null) { if (info != null && topic != null) {
TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info); TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
if (prev != null) { if (prev != null) {
log.info("updateTopicPublishInfo prev is not null, " + prev.toString()); log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
} }
} }
} }
@Override @Override
public boolean isUnitMode() { public boolean isUnitMode() {
return this.defaultMQProducer.isUnitMode(); return this.defaultMQProducer.isUnitMode();
} }
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0); createTopic(key, newTopic, queueNum, 0);
} }
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkTopic(newTopic); Validators.checkTopic(newTopic);
this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
} }
private void makeSureStateOK() throws MQClientException { private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) { if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, " throw new MQClientException("The producer service state not OK, "
+ this.serviceState + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null); null);
} }
} }
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
} }
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} }
public long maxOffset(MessageQueue mq) throws MQClientException { public long maxOffset(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} }
public long minOffset(MessageQueue mq) throws MQClientException { public long minOffset(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().minOffset(mq); return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
} }
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
} }
public MessageExt viewMessage( public MessageExt viewMessage(
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
} }
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException { throws MQClientException, InterruptedException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
} }
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
throws MQClientException, InterruptedException { throws MQClientException, InterruptedException {
this.makeSureStateOK(); this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
} }
/** /**
* DEFAULT ASYNC ------------------------------------------------------- * DEFAULT ASYNC -------------------------------------------------------
*/ */
public void send(Message msg, public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
/** /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version * provided in next version
* *
* @param msg * @param msg
* @param sendCallback * @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time * @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws RejectedExecutionException * @throws RejectedExecutionException
*/ */
@Deprecated @Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) { if (timeout > costTime) {
try { try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) { } catch (Exception e) {
sendCallback.onException(e); sendCallback.onException(e);
} }
} else { } else {
sendCallback.onException( sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
} }
} }
}); });
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e); throw new MQClientException("executor rejected ", e);
} }
} }
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
} }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
} }
private void validateNameServerSetting() throws MQClientException { private void validateNameServerSetting() throws MQClientException {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) { if (null == nsList || nsList.isEmpty()) {
throw new MQClientException( throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
} }
} }
private SendResult sendDefaultImpl( private SendResult sendDefaultImpl(
Message msg, Message msg,
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
final SendCallback sendCallback, final SendCallback sendCallback,
final long timeout final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong(); final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst; long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst; long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) { if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false; boolean callTimeout = false;
MessageQueue mq = null; MessageQueue mq = null;
Exception exception = null; Exception exception = null;
SendResult sendResult = null; SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0; int times = 0;
String[] brokersSent = new String[timesTotal]; String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) { for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName(); String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) { if (mqSelected != null) {
mq = mqSelected; mq = mqSelected;
brokersSent[times] = mq.getBrokerName(); brokersSent[times] = mq.getBrokerName();
try { try {
beginTimestampPrev = System.currentTimeMillis(); beginTimestampPrev = System.currentTimeMillis();
if (times > 0) { if (times > 0) {
//Reset topic with namespace during resend. //Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
} }
long costTime = beginTimestampPrev - beginTimestampFirst; long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) { if (timeout < costTime) {
callTimeout = true; callTimeout = true;
break; break;
} }
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) { switch (communicationMode) {
case ASYNC: case ASYNC:
return null; return null;
case ONEWAY: case ONEWAY:
return null; return null;
case SYNC: case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue; continue;
} }
} }
return sendResult; return sendResult;
default: default:
break; break;
} }
} catch (RemotingException e) { } catch (RemotingException e) {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString()); log.warn(msg.toString());
exception = e; exception = e;
continue; continue;
} catch (MQClientException e) { } catch (MQClientException e) {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString()); log.warn(msg.toString());
exception = e; exception = e;
continue; continue;
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString()); log.warn(msg.toString());
exception = e; exception = e;
switch (e.getResponseCode()) { switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR: case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION: case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID: case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT: case ResponseCode.NOT_IN_CURRENT_UNIT:
continue; continue;
default: default:
if (sendResult != null) { if (sendResult != null) {
return sendResult; return sendResult;
} }
throw e; throw e;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString()); log.warn(msg.toString());
log.warn("sendKernelImpl exception", e); log.warn("sendKernelImpl exception", e);
log.warn(msg.toString()); log.warn(msg.toString());
throw e; throw e;
} }
} else { } else {
break; break;
} }
} }
if (sendResult != null) { if (sendResult != null) {
return sendResult; return sendResult;
} }
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times, times,
System.currentTimeMillis() - beginTimestampFirst, System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(), msg.getTopic(),
Arrays.toString(brokersSent)); Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception); MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) { if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
} }
if (exception instanceof MQBrokerException) { if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) { } else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) { } else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) { } else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
} }
throw mqClientException; throw mqClientException;
} }
validateNameServerSetting(); validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
} }
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) { if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic);
} }
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo; return topicPublishInfo;
} else { } else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo; return topicPublishInfo;
} }
} }
private SendResult sendKernelImpl(final Message msg, private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq, final MessageQueue mq,
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
final SendCallback sendCallback, final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo, final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) { if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic()); tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
} }
SendMessageContext context = null; SendMessageContext context = null;
if (brokerAddr != null) { if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody(); byte[] prevBody = msg.getBody();
try { try {
//for MessageBatch,ID has been set in the generating process //for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) { if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg); MessageClientIDSetter.setUniqID(msg);
} }
boolean topicWithNamespace = false; boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) { if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true; topicWithNamespace = true;
} }
int sysFlag = 0; int sysFlag = 0;
boolean msgBodyCompressed = false; boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) { if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG; sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true; msgBodyCompressed = true;
} }
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
} }
if (hasCheckForbiddenHook()) { if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq); checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode()); checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext); this.executeCheckForbiddenHook(checkForbiddenContext);
} }
if (this.hasSendMessageHook()) { if (this.hasSendMessageHook()) {
context = new SendMessageContext(); context = new SendMessageContext();
context.setProducer(this); context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode); context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr); context.setBrokerAddr(brokerAddr);
context.setMessage(msg); context.setMessage(msg);
context.setMq(mq); context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace()); context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) { if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half); context.setMsgType(MessageType.Trans_Msg_Half);
} }
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg); context.setMsgType(MessageType.Delay_Msg);
} }
this.executeSendMessageHookBefore(context); this.executeSendMessageHookBefore(context);
} }
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic()); requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag); requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag()); requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0); requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch); requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) { if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
} }
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) { if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
} }
} }
SendResult sendResult = null; SendResult sendResult = null;
switch (communicationMode) { switch (communicationMode) {
case ASYNC: case ASYNC:
Message tmpMessage = msg; Message tmpMessage = msg;
boolean messageCloned = false; boolean messageCloned = false;
if (msgBodyCompressed) { if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody. //If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage. //Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66 //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg); tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true; messageCloned = true;
msg.setBody(prevBody); msg.setBody(prevBody);
} }
if (topicWithNamespace) { if (topicWithNamespace) {
if (!messageCloned) { if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg); tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true; messageCloned = true;
} }
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
} }
long costTimeAsync = System.currentTimeMillis() - beginStartTime; long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) { if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
} }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, brokerAddr,
mq.getBrokerName(), mq.getBrokerName(),
tmpMessage, tmpMessage,
requestHeader, requestHeader,
timeout - costTimeAsync, timeout - costTimeAsync,
communicationMode, communicationMode,
sendCallback, sendCallback,
topicPublishInfo, topicPublishInfo,
this.mQClientFactory, this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context, context,
this); this);
break; break;
case ONEWAY: case ONEWAY:
case SYNC: case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime; long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) { if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
} }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, brokerAddr,
mq.getBrokerName(), mq.getBrokerName(),
msg, msg,
requestHeader, requestHeader,
timeout - costTimeSync, timeout - costTimeSync,
communicationMode, communicationMode,
context, context,
this); this);
break; break;
default: default:
assert false; assert false;
break; break;
} }
if (this.hasSendMessageHook()) { if (this.hasSendMessageHook()) {
context.setSendResult(sendResult); context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context); this.executeSendMessageHookAfter(context);
} }
return sendResult; return sendResult;
} catch (RemotingException e) { } catch (RemotingException e) {
if (this.hasSendMessageHook()) { if (this.hasSendMessageHook()) {
context.setException(e); context.setException(e);
this.executeSendMessageHookAfter(context); this.executeSendMessageHookAfter(context);
} }
throw e; throw e;
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
if (this.hasSendMessageHook()) { if (this.hasSendMessageHook()) {
context.setException(e); context.setException(e);
this.executeSendMessageHookAfter(context); this.executeSendMessageHookAfter(context);
} }
throw e; throw e;
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (this.hasSendMessageHook()) { if (this.hasSendMessageHook()) {
context.setException(e); context.setException(e);
this.executeSendMessageHookAfter(context); this.executeSendMessageHookAfter(context);
} }
throw e; throw e;
} finally { } finally {
msg.setBody(prevBody); msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
} }
} }
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
} }
public MQClientInstance getmQClientFactory() { public MQClientInstance getmQClientFactory() {
return mQClientFactory; return mQClientFactory;
} }
private boolean tryToCompressMessage(final Message msg) { private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) { if (msg instanceof MessageBatch) {
//batch dose not support compressing right now //batch dose not support compressing right now
return false; return false;
} }
byte[] body = msg.getBody(); byte[] body = msg.getBody();
if (body != null) { if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try { try {
byte[] data = UtilAll.compress(body, zipCompressLevel); byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) { if (data != null) {
msg.setBody(data); msg.setBody(data);
return true; return true;
} }
} catch (IOException e) { } catch (IOException e) {
log.error("tryToCompressMessage exception", e); log.error("tryToCompressMessage exception", e);
log.warn(msg.toString()); log.warn(msg.toString());
} }
} }
} }
return false; return false;
} }
public boolean hasCheckForbiddenHook() { public boolean hasCheckForbiddenHook() {
return !checkForbiddenHookList.isEmpty(); return !checkForbiddenHookList.isEmpty();
} }
public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException { public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException {
if (hasCheckForbiddenHook()) { if (hasCheckForbiddenHook()) {
for (CheckForbiddenHook hook : checkForbiddenHookList) { for (CheckForbiddenHook hook : checkForbiddenHookList) {
hook.checkForbidden(context); hook.checkForbidden(context);
} }
} }
} }
public boolean hasSendMessageHook() { public boolean hasSendMessageHook() {
return !this.sendMessageHookList.isEmpty(); return !this.sendMessageHookList.isEmpty();
} }
public void executeSendMessageHookBefore(final SendMessageContext context) { public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) { if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) { for (SendMessageHook hook : this.sendMessageHookList) {
try { try {
hook.sendMessageBefore(context); hook.sendMessageBefore(context);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e); log.warn("failed to executeSendMessageHookBefore", e);
} }
} }
} }
} }
public void executeSendMessageHookAfter(final SendMessageContext context) { public void executeSendMessageHookAfter(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) { if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) { for (SendMessageHook hook : this.sendMessageHookList) {
try { try {
hook.sendMessageAfter(context); hook.sendMessageAfter(context);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("failed to executeSendMessageHookAfter", e); log.warn("failed to executeSendMessageHookAfter", e);
} }
} }
} }
} }
/** /**
* DEFAULT ONEWAY ------------------------------------------------------- * DEFAULT ONEWAY -------------------------------------------------------
*/ */
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
try { try {
this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e); throw new MQClientException("unknown exception", e);
} }
} }
/** /**
* KERNEL SYNC ------------------------------------------------------- * KERNEL SYNC -------------------------------------------------------
*/ */
public SendResult send(Message msg, MessageQueue mq) public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
} }
public SendResult send(Message msg, MessageQueue mq, long timeout) public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
if (!msg.getTopic().equals(mq.getTopic())) { if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null); throw new MQClientException("message's topic not equal mq's topic", null);
} }
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) { if (timeout < costTime) {
throw new RemotingTooMuchRequestException("call timeout"); throw new RemotingTooMuchRequestException("call timeout");
} }
return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
} }
/** /**
* KERNEL ASYNC ------------------------------------------------------- * KERNEL ASYNC -------------------------------------------------------
*/ */
public void send(Message msg, MessageQueue mq, SendCallback sendCallback) public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
/** /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version * provided in next version
* *
* @param msg * @param msg
* @param mq * @param mq
* @param sendCallback * @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time * @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException * @throws MQClientException
* @throws RemotingException * @throws RemotingException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Deprecated @Deprecated
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
makeSureStateOK(); makeSureStateOK();
Validators.checkMessage(msg, defaultMQProducer); Validators.checkMessage(msg, defaultMQProducer);
if (!msg.getTopic().equals(mq.getTopic())) { if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null); throw new MQClientException("message's topic not equal mq's topic", null);
} }
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) { if (timeout > costTime) {
try { try {
sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
timeout - costTime); timeout - costTime);
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e); throw new MQClientException("unknown exception", e);
} }
} else { } else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
} }
} catch (Exception e) { } catch (Exception e) {
sendCallback.onException(e); sendCallback.onException(e);
} }
} }
}); });
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e); throw new MQClientException("executor rejected ", e);
} }
} }
/** /**
* KERNEL ONEWAY ------------------------------------------------------- * KERNEL ONEWAY -------------------------------------------------------
*/ */
public void sendOneway(Message msg, public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
try { try {
this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout()); this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e); throw new MQClientException("unknown exception", e);
} }
} }
/** /**
* SELECT SYNC ------------------------------------------------------- * SELECT SYNC -------------------------------------------------------
*/ */
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
} }
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
} }
private SendResult sendSelectImpl( private SendResult sendSelectImpl(
Message msg, Message msg,
MessageQueueSelector selector, MessageQueueSelector selector,
Object arg, Object arg,
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) { if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null; MessageQueue mq = null;
try { try {
List<MessageQueue> messageQueueList = List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg); Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic); userMessage.setTopic(userTopic);
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) { } catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e); throw new MQClientException("select message queue throwed exception.", e);
} }
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) { if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
} }
if (mq != null) { if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else { } else {
throw new MQClientException("select message queue return null.", null); throw new MQClientException("select message queue return null.", null);
} }
} }
validateNameServerSetting(); validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
} }
/** /**
* SELECT ASYNC ------------------------------------------------------- * SELECT ASYNC -------------------------------------------------------
*/ */
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
/** /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version * provided in next version
* *
* @param msg * @param msg
* @param selector * @param selector
* @param arg * @param arg
* @param sendCallback * @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time * @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException * @throws MQClientException
* @throws RemotingException * @throws RemotingException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Deprecated @Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) { if (timeout > costTime) {
try { try {
try { try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
timeout - costTime); timeout - costTime);
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e); throw new MQClientException("unknownn exception", e);
} }
} catch (Exception e) { } catch (Exception e) {
sendCallback.onException(e); sendCallback.onException(e);
} }
} else { } else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
} }
} }
}); });
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
throw new MQClientException("exector rejected ", e); throw new MQClientException("exector rejected ", e);
} }
} }
/** /**
* SELECT ONEWAY ------------------------------------------------------- * SELECT ONEWAY -------------------------------------------------------
*/ */
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
try { try {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e); throw new MQClientException("unknown exception", e);
} }
} }
public TransactionSendResult sendMessageInTransaction(final Message msg, public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg) final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException { throws MQClientException {
TransactionListener transactionListener = getCheckListener(); TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) { if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null); throw new MQClientException("tranExecutor is null", null);
} }
// ignore DelayTimeLevel parameter // ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) { if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
} }
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null; SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try { try {
sendResult = this.send(msg); sendResult = this.send(msg);
} catch (Exception e) { } catch (Exception e) {
throw new MQClientException("send message Exception", e); throw new MQClientException("send message Exception", e);
} }
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null; Throwable localException = null;
switch (sendResult.getSendStatus()) { switch (sendResult.getSendStatus()) {
case SEND_OK: { case SEND_OK: {
try { try {
if (sendResult.getTransactionId() != null) { if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
} }
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) { if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId); msg.setTransactionId(transactionId);
} }
if (null != localTransactionExecuter) { if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) { } else if (transactionListener != null) {
log.debug("Used new transaction API"); log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg); localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
} }
if (null == localTransactionState) { if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW; localTransactionState = LocalTransactionState.UNKNOW;
} }
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString()); log.info(msg.toString());
} }
} catch (Throwable e) { } catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e); log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString()); log.info(msg.toString());
localException = e; localException = e;
} }
} }
break; break;
case FLUSH_DISK_TIMEOUT: case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT: case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE: case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break; break;
default: default:
break; break;
} }
try { try {
this.endTransaction(sendResult, localTransactionState, localException); this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) { } catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
} }
TransactionSendResult transactionSendResult = new TransactionSendResult(); TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState); transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult; return transactionSendResult;
} }
/** /**
* DEFAULT SYNC ------------------------------------------------------- * DEFAULT SYNC -------------------------------------------------------
*/ */
public SendResult send( public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout()); return send(msg, this.defaultMQProducer.getSendMsgTimeout());
} }
public void endTransaction( public void endTransaction(
final SendResult sendResult, final SendResult sendResult,
final LocalTransactionState localTransactionState, final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id; final MessageId id;
if (sendResult.getOffsetMsgId() != null) { if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else { } else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
} }
String transactionId = sendResult.getTransactionId(); String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId); requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset()); requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) { switch (localTransactionState) {
case COMMIT_MESSAGE: case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break; break;
case ROLLBACK_MESSAGE: case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break; break;
case UNKNOW: case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break; break;
default: default:
break; break;
} }
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId()); requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout()); this.defaultMQProducer.getSendMsgTimeout());
} }
public void setCallbackExecutor(final ExecutorService callbackExecutor) { public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
} }
public ExecutorService getAsyncSenderExecutor() { public ExecutorService getAsyncSenderExecutor() {
return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor; return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
} }
public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
this.asyncSenderExecutor = asyncSenderExecutor; this.asyncSenderExecutor = asyncSenderExecutor;
} }
public SendResult send(Message msg, public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
} }
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() { public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable; return topicPublishInfoTable;
} }
public int getZipCompressLevel() { public int getZipCompressLevel() {
return zipCompressLevel; return zipCompressLevel;
} }
public void setZipCompressLevel(int zipCompressLevel) { public void setZipCompressLevel(int zipCompressLevel) {
this.zipCompressLevel = zipCompressLevel; this.zipCompressLevel = zipCompressLevel;
} }
public ServiceState getServiceState() { public ServiceState getServiceState() {
return serviceState; return serviceState;
} }
public void setServiceState(ServiceState serviceState) { public void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState; this.serviceState = serviceState;
} }
public long[] getNotAvailableDuration() { public long[] getNotAvailableDuration() {
return this.mqFaultStrategy.getNotAvailableDuration(); return this.mqFaultStrategy.getNotAvailableDuration();
} }
public void setNotAvailableDuration(final long[] notAvailableDuration) { public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration); this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration);
} }
public long[] getLatencyMax() { public long[] getLatencyMax() {
return this.mqFaultStrategy.getLatencyMax(); return this.mqFaultStrategy.getLatencyMax();
} }
public void setLatencyMax(final long[] latencyMax) { public void setLatencyMax(final long[] latencyMax) {
this.mqFaultStrategy.setLatencyMax(latencyMax); this.mqFaultStrategy.setLatencyMax(latencyMax);
} }
public boolean isSendLatencyFaultEnable() { public boolean isSendLatencyFaultEnable() {
return this.mqFaultStrategy.isSendLatencyFaultEnable(); return this.mqFaultStrategy.isSendLatencyFaultEnable();
} }
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
} }
public DefaultMQProducer getDefaultMQProducer() { public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer; return defaultMQProducer;
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册