diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8346dc7ed26a5ff58ef4e583a18b4ecc07b006ae..5b7ba9c6f79d24adb5e9e48b7d777e6c2aa0c018 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -34,7 +34,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -89,1288 +88,1288 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; public class DefaultMQProducerImpl implements MQProducerInner { - private final InternalLogger log = ClientLogger.getLog(); - private final Random random = new Random(); - private final DefaultMQProducer defaultMQProducer; - private final ConcurrentMap topicPublishInfoTable = - new ConcurrentHashMap(); - private final ArrayList sendMessageHookList = new ArrayList(); - private final RPCHook rpcHook; - protected BlockingQueue checkRequestQueue; - protected ExecutorService checkExecutor; - private ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; - private ArrayList checkForbiddenHookList = new ArrayList(); - private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); - - private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); - - private final BlockingQueue asyncSenderThreadPoolQueue; - private final ExecutorService defaultAsyncSenderExecutor; - private ExecutorService asyncSenderExecutor; - - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { - this(defaultMQProducer, null); - } - - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { - this.defaultMQProducer = defaultMQProducer; - this.rpcHook = rpcHook; - - this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000); - this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors(), - Runtime.getRuntime().availableProcessors(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.asyncSenderThreadPoolQueue, - new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); - } - }); - } - - public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { - this.checkForbiddenHookList.add(checkForbiddenHook); - log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), - checkForbiddenHookList.size()); - } - - public void initTransactionEnv() { - TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; - if (producer.getExecutorService() != null) { - this.checkExecutor = producer.getExecutorService(); - } else { - this.checkRequestQueue = new LinkedBlockingQueue(producer.getCheckRequestHoldMax()); - this.checkExecutor = new ThreadPoolExecutor( - producer.getCheckThreadPoolMinSize(), - producer.getCheckThreadPoolMaxSize(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.checkRequestQueue); - } - } - - public void destroyTransactionEnv() { - if (this.checkExecutor != null) { - this.checkExecutor.shutdown(); - } - } - - public void registerSendMessageHook(final SendMessageHook hook) { - this.sendMessageHookList.add(hook); - log.info("register sendMessage Hook, {}", hook.hookName()); - } - - public void start() throws MQClientException { - this.start(true); - } - - public void start(final boolean startFactory) throws MQClientException { - switch (this.serviceState) { - case CREATE_JUST: - this.serviceState = ServiceState.START_FAILED; - - this.checkConfig(); - - if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { - this.defaultMQProducer.changeInstanceNameToPID(); - } - - this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); - - boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); - if (!registerOK) { - this.serviceState = ServiceState.CREATE_JUST; - throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); - } - - this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); - - if (startFactory) { - mQClientFactory.start(); - } - - log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), - this.defaultMQProducer.isSendMessageWithVIPChannel()); - this.serviceState = ServiceState.RUNNING; - break; - case RUNNING: - case START_FAILED: - case SHUTDOWN_ALREADY: - throw new MQClientException("The producer service state not OK, maybe started once, " - + this.serviceState - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - default: - break; - } - - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - } - - private void checkConfig() throws MQClientException { - Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); - - if (null == this.defaultMQProducer.getProducerGroup()) { - throw new MQClientException("producerGroup is null", null); - } - - if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { - throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", - null); - } - } - - public void shutdown() { - this.shutdown(true); - } - - public void shutdown(final boolean shutdownFactory) { - switch (this.serviceState) { - case CREATE_JUST: - break; - case RUNNING: - this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); - this.defaultAsyncSenderExecutor.shutdown(); - if (shutdownFactory) { - this.mQClientFactory.shutdown(); - } - - log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); - this.serviceState = ServiceState.SHUTDOWN_ALREADY; - break; - case SHUTDOWN_ALREADY: - break; - default: - break; - } - } - - @Override - public Set getPublishTopicList() { - Set topicList = new HashSet(); - for (String key : this.topicPublishInfoTable.keySet()) { - topicList.add(key); - } - - return topicList; - } - - @Override - public boolean isPublishTopicNeedUpdate(String topic) { - TopicPublishInfo prev = this.topicPublishInfoTable.get(topic); - - return null == prev || !prev.ok(); - } - - /** - * This method will be removed in the version 5.0.0 and getCheckListener is recommended. - * - * @return - */ - @Override - @Deprecated - public TransactionCheckListener checkListener() { - if (this.defaultMQProducer instanceof TransactionMQProducer) { - TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; - return producer.getTransactionCheckListener(); - } - - return null; - } - - @Override - public TransactionListener getCheckListener() { - if (this.defaultMQProducer instanceof TransactionMQProducer) { - TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; - return producer.getTransactionListener(); - } - return null; - } - - @Override - public void checkTransactionState(final String addr, final MessageExt msg, - final CheckTransactionStateRequestHeader header) { - Runnable request = new Runnable() { - private final String brokerAddr = addr; - private final MessageExt message = msg; - private final CheckTransactionStateRequestHeader checkRequestHeader = header; - private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); - - @Override - public void run() { - TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); - TransactionListener transactionListener = getCheckListener(); - if (transactionCheckListener != null || transactionListener != null) { - LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; - Throwable exception = null; - try { - if (transactionCheckListener != null) { - localTransactionState = transactionCheckListener.checkLocalTransactionState(message); - } else if (transactionListener != null) { - log.debug("Used new check API in transaction message"); - localTransactionState = transactionListener.checkLocalTransaction(message); - } else { - log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); - } - } catch (Throwable e) { - log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); - exception = e; - } - - this.processTransactionState( - localTransactionState, - group, - exception); - } else { - log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); - } - } - - private void processTransactionState( - final LocalTransactionState localTransactionState, - final String producerGroup, - final Throwable exception) { - final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); - thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); - thisHeader.setProducerGroup(producerGroup); - thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); - thisHeader.setFromTransactionCheck(true); - - String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - if (uniqueKey == null) { - uniqueKey = message.getMsgId(); - } - thisHeader.setMsgId(uniqueKey); - thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); - switch (localTransactionState) { - case COMMIT_MESSAGE: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); - break; - case ROLLBACK_MESSAGE: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); - log.warn("when broker check, client rollback this transaction, {}", thisHeader); - break; - case UNKNOW: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); - log.warn("when broker check, client does not know this transaction state, {}", thisHeader); - break; - default: - break; - } - - String remark = null; - if (exception != null) { - remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); - } - - try { - DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, - 3000); - } catch (Exception e) { - log.error("endTransactionOneway exception", e); - } - } - }; - - this.checkExecutor.submit(request); - } - - @Override - public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) { - if (info != null && topic != null) { - TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info); - if (prev != null) { - log.info("updateTopicPublishInfo prev is not null, " + prev.toString()); - } - } - } - - @Override - public boolean isUnitMode() { - return this.defaultMQProducer.isUnitMode(); - } - - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.makeSureStateOK(); - Validators.checkTopic(newTopic); - - this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); - } - - private void makeSureStateOK() throws MQClientException { - if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The producer service state not OK, " - + this.serviceState - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - } - } - - public List fetchPublishMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); - } - - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); - } - - public long maxOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } - - public long minOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().minOffset(mq); - } - - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); - } - - public MessageExt viewMessage( - String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.makeSureStateOK(); - - return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); - } - - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); - } - - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); - } - - /** - * DEFAULT ASYNC ------------------------------------------------------- - */ - public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { - send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be - * provided in next version - * - * @param msg - * @param sendCallback - * @param timeout the sendCallback will be invoked at most time - * @throws RejectedExecutionException - */ - @Deprecated - public void send(final Message msg, final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException { - final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getAsyncSenderExecutor(); - try { - executor.submit(new Runnable() { - @Override - public void run() { - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeout > costTime) { - try { - sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); - } catch (Exception e) { - sendCallback.onException(e); - } - } else { - sendCallback.onException( - new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); - } - } - - }); - } catch (RejectedExecutionException e) { - throw new MQClientException("executor rejected ", e); - } - - } - - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { - return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); - } - - public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { - this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); - } - - private void validateNameServerSetting() throws MQClientException { - List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); - if (null == nsList || nsList.isEmpty()) { - throw new MQClientException( - "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); - } - - } - - private SendResult sendDefaultImpl( - Message msg, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final long timeout - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - final long invokeID = random.nextLong(); - long beginTimestampFirst = System.currentTimeMillis(); - long beginTimestampPrev = beginTimestampFirst; - long endTimestamp = beginTimestampFirst; - TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); - if (topicPublishInfo != null && topicPublishInfo.ok()) { - boolean callTimeout = false; - MessageQueue mq = null; - Exception exception = null; - SendResult sendResult = null; - int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; - int times = 0; - String[] brokersSent = new String[timesTotal]; - for (; times < timesTotal; times++) { - String lastBrokerName = null == mq ? null : mq.getBrokerName(); - MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); - if (mqSelected != null) { - mq = mqSelected; - brokersSent[times] = mq.getBrokerName(); - try { - beginTimestampPrev = System.currentTimeMillis(); - if (times > 0) { - //Reset topic with namespace during resend. - msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); - } - long costTime = beginTimestampPrev - beginTimestampFirst; - if (timeout < costTime) { - callTimeout = true; - break; - } - - sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); - switch (communicationMode) { - case ASYNC: - return null; - case ONEWAY: - return null; - case SYNC: - if (sendResult.getSendStatus() != SendStatus.SEND_OK) { - if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { - continue; - } - } - - return sendResult; - default: - break; - } - } catch (RemotingException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - continue; - } catch (MQClientException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - continue; - } catch (MQBrokerException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - switch (e.getResponseCode()) { - case ResponseCode.TOPIC_NOT_EXIST: - case ResponseCode.SERVICE_NOT_AVAILABLE: - case ResponseCode.SYSTEM_ERROR: - case ResponseCode.NO_PERMISSION: - case ResponseCode.NO_BUYER_ID: - case ResponseCode.NOT_IN_CURRENT_UNIT: - continue; - default: - if (sendResult != null) { - return sendResult; - } - - throw e; - } - } catch (InterruptedException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); - log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - - log.warn("sendKernelImpl exception", e); - log.warn(msg.toString()); - throw e; - } - } else { - break; - } - } - - if (sendResult != null) { - return sendResult; - } - - String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", - times, - System.currentTimeMillis() - beginTimestampFirst, - msg.getTopic(), - Arrays.toString(brokersSent)); - - info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); - - MQClientException mqClientException = new MQClientException(info, exception); - if (callTimeout) { - throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); - } - - if (exception instanceof MQBrokerException) { - mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); - } else if (exception instanceof RemotingConnectException) { - mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); - } else if (exception instanceof RemotingTimeoutException) { - mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); - } else if (exception instanceof MQClientException) { - mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); - } - - throw mqClientException; - } - - validateNameServerSetting(); - - throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), - null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); - } - - private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { - TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); - if (null == topicPublishInfo || !topicPublishInfo.ok()) { - this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - topicPublishInfo = this.topicPublishInfoTable.get(topic); - } - - if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { - return topicPublishInfo; - } else { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); - topicPublishInfo = this.topicPublishInfoTable.get(topic); - return topicPublishInfo; - } - } - - private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - long beginStartTime = System.currentTimeMillis(); - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - tryToFindTopicPublishInfo(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - SendMessageContext context = null; - if (brokerAddr != null) { - brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); - - byte[] prevBody = msg.getBody(); - try { - //for MessageBatch,ID has been set in the generating process - if (!(msg instanceof MessageBatch)) { - MessageClientIDSetter.setUniqID(msg); - } - - boolean topicWithNamespace = false; - if (null != this.mQClientFactory.getClientConfig().getNamespace()) { - msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); - topicWithNamespace = true; - } - - int sysFlag = 0; - boolean msgBodyCompressed = false; - if (this.tryToCompressMessage(msg)) { - sysFlag |= MessageSysFlag.COMPRESSED_FLAG; - msgBodyCompressed = true; - } - - final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { - sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; - } - - if (hasCheckForbiddenHook()) { - CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); - checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); - checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); - checkForbiddenContext.setCommunicationMode(communicationMode); - checkForbiddenContext.setBrokerAddr(brokerAddr); - checkForbiddenContext.setMessage(msg); - checkForbiddenContext.setMq(mq); - checkForbiddenContext.setUnitMode(this.isUnitMode()); - this.executeCheckForbiddenHook(checkForbiddenContext); - } - - if (this.hasSendMessageHook()) { - context = new SendMessageContext(); - context.setProducer(this); - context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - context.setCommunicationMode(communicationMode); - context.setBornHost(this.defaultMQProducer.getClientIP()); - context.setBrokerAddr(brokerAddr); - context.setMessage(msg); - context.setMq(mq); - context.setNamespace(this.defaultMQProducer.getNamespace()); - String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (isTrans != null && isTrans.equals("true")) { - context.setMsgType(MessageType.Trans_Msg_Half); - } - - if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { - context.setMsgType(MessageType.Delay_Msg); - } - this.executeSendMessageHookBefore(context); - } - - SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); - requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - requestHeader.setTopic(msg.getTopic()); - requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); - requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); - requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setSysFlag(sysFlag); - requestHeader.setBornTimestamp(System.currentTimeMillis()); - requestHeader.setFlag(msg.getFlag()); - requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); - requestHeader.setReconsumeTimes(0); - requestHeader.setUnitMode(this.isUnitMode()); - requestHeader.setBatch(msg instanceof MessageBatch); - if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); - if (reconsumeTimes != null) { - requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); - } - - String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); - if (maxReconsumeTimes != null) { - requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); - } - } - - SendResult sendResult = null; - switch (communicationMode) { - case ASYNC: - Message tmpMessage = msg; - boolean messageCloned = false; - if (msgBodyCompressed) { - //If msg body was compressed, msgbody should be reset using prevBody. - //Clone new message using commpressed message body and recover origin massage. - //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 - tmpMessage = MessageAccessor.cloneMessage(msg); - messageCloned = true; - msg.setBody(prevBody); - } - - if (topicWithNamespace) { - if (!messageCloned) { - tmpMessage = MessageAccessor.cloneMessage(msg); - messageCloned = true; - } - msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); - } - - long costTimeAsync = System.currentTimeMillis() - beginStartTime; - if (timeout < costTimeAsync) { - throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); - } - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, - mq.getBrokerName(), - tmpMessage, - requestHeader, - timeout - costTimeAsync, - communicationMode, - sendCallback, - topicPublishInfo, - this.mQClientFactory, - this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), - context, - this); - break; - case ONEWAY: - case SYNC: - long costTimeSync = System.currentTimeMillis() - beginStartTime; - if (timeout < costTimeSync) { - throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); - } - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, - mq.getBrokerName(), - msg, - requestHeader, - timeout - costTimeSync, - communicationMode, - context, - this); - break; - default: - assert false; - break; - } - - if (this.hasSendMessageHook()) { - context.setSendResult(sendResult); - this.executeSendMessageHookAfter(context); - } - - return sendResult; - } catch (RemotingException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (MQBrokerException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (InterruptedException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } finally { - msg.setBody(prevBody); - msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - public MQClientInstance getmQClientFactory() { - return mQClientFactory; - } - - private boolean tryToCompressMessage(final Message msg) { - if (msg instanceof MessageBatch) { - //batch dose not support compressing right now - return false; - } - byte[] body = msg.getBody(); - if (body != null) { - if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { - try { - byte[] data = UtilAll.compress(body, zipCompressLevel); - if (data != null) { - msg.setBody(data); - return true; - } - } catch (IOException e) { - log.error("tryToCompressMessage exception", e); - log.warn(msg.toString()); - } - } - } - - return false; - } - - public boolean hasCheckForbiddenHook() { - return !checkForbiddenHookList.isEmpty(); - } - - public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException { - if (hasCheckForbiddenHook()) { - for (CheckForbiddenHook hook : checkForbiddenHookList) { - hook.checkForbidden(context); - } - } - } - - public boolean hasSendMessageHook() { - return !this.sendMessageHookList.isEmpty(); - } - - public void executeSendMessageHookBefore(final SendMessageContext context) { - if (!this.sendMessageHookList.isEmpty()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - hook.sendMessageBefore(context); - } catch (Throwable e) { - log.warn("failed to executeSendMessageHookBefore", e); - } - } - } - } - - public void executeSendMessageHookAfter(final SendMessageContext context) { - if (!this.sendMessageHookList.isEmpty()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - hook.sendMessageAfter(context); - } catch (Throwable e) { - log.warn("failed to executeSendMessageHookAfter", e); - } - } - } - } - - /** - * DEFAULT ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { - try { - this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - /** - * KERNEL SYNC ------------------------------------------------------- - */ - public SendResult send(Message msg, MessageQueue mq) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); - } - - public SendResult send(Message msg, MessageQueue mq, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - long beginStartTime = System.currentTimeMillis(); - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } - - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeout < costTime) { - throw new RemotingTooMuchRequestException("call timeout"); - } - - return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); - } - - /** - * KERNEL ASYNC ------------------------------------------------------- - */ - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be - * provided in next version - * - * @param msg - * @param mq - * @param sendCallback - * @param timeout the sendCallback will be invoked at most time - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException - */ - @Deprecated - public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException { - final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getAsyncSenderExecutor(); - try { - executor.submit(new Runnable() { - @Override - public void run() { - try { - makeSureStateOK(); - Validators.checkMessage(msg, defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeout > costTime) { - try { - sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, - timeout - costTime); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } else { - sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); - } - } catch (Exception e) { - sendCallback.onException(e); - } - - } - - }); - } catch (RejectedExecutionException e) { - throw new MQClientException("executor rejected ", e); - } - - } - - /** - * KERNEL ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg, - MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - try { - this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - /** - * SELECT SYNC ------------------------------------------------------- - */ - public SendResult send(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); - } - - public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); - } - - private SendResult sendSelectImpl( - Message msg, - MessageQueueSelector selector, - Object arg, - final CommunicationMode communicationMode, - final SendCallback sendCallback, final long timeout - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - long beginStartTime = System.currentTimeMillis(); - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); - if (topicPublishInfo != null && topicPublishInfo.ok()) { - MessageQueue mq = null; - try { - List messageQueueList = - mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); - Message userMessage = MessageAccessor.cloneMessage(msg); - String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); - userMessage.setTopic(userTopic); - - mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); - } catch (Throwable e) { - throw new MQClientException("select message queue throwed exception.", e); - } - - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeout < costTime) { - throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); - } - if (mq != null) { - return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); - } else { - throw new MQClientException("select message queue return null.", null); - } - } - - validateNameServerSetting(); - throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); - } - - /** - * SELECT ASYNC ------------------------------------------------------- - */ - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be - * provided in next version - * - * @param msg - * @param selector - * @param arg - * @param sendCallback - * @param timeout the sendCallback will be invoked at most time - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException - */ - @Deprecated - public void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException { - final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getAsyncSenderExecutor(); - try { - executor.submit(new Runnable() { - @Override - public void run() { - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeout > costTime) { - try { - try { - sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, - timeout - costTime); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - } - } catch (Exception e) { - sendCallback.onException(e); - } - } else { - sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); - } - } - - }); - } catch (RejectedExecutionException e) { - throw new MQClientException("exector rejected ", e); - } - } - - /** - * SELECT ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter localTransactionExecuter, final Object arg) - throws MQClientException { - TransactionListener transactionListener = getCheckListener(); - if (null == localTransactionExecuter && null == transactionListener) { - throw new MQClientException("tranExecutor is null", null); - } - - // ignore DelayTimeLevel parameter - if (msg.getDelayTimeLevel() != 0) { - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); - } - - Validators.checkMessage(msg, this.defaultMQProducer); - - SendResult sendResult = null; - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); - try { - sendResult = this.send(msg); - } catch (Exception e) { - throw new MQClientException("send message Exception", e); - } - - LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; - Throwable localException = null; - switch (sendResult.getSendStatus()) { - case SEND_OK: { - try { - if (sendResult.getTransactionId() != null) { - msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); - } - String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - if (null != transactionId && !"".equals(transactionId)) { - msg.setTransactionId(transactionId); - } - if (null != localTransactionExecuter) { - localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); - } else if (transactionListener != null) { - log.debug("Used new transaction API"); - localTransactionState = transactionListener.executeLocalTransaction(msg, arg); - } - if (null == localTransactionState) { - localTransactionState = LocalTransactionState.UNKNOW; - } - - if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { - log.info("executeLocalTransactionBranch return {}", localTransactionState); - log.info(msg.toString()); - } - } catch (Throwable e) { - log.info("executeLocalTransactionBranch exception", e); - log.info(msg.toString()); - localException = e; - } - } - break; - case FLUSH_DISK_TIMEOUT: - case FLUSH_SLAVE_TIMEOUT: - case SLAVE_NOT_AVAILABLE: - localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; - break; - default: - break; - } - - try { - this.endTransaction(sendResult, localTransactionState, localException); - } catch (Exception e) { - log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); - } - - TransactionSendResult transactionSendResult = new TransactionSendResult(); - transactionSendResult.setSendStatus(sendResult.getSendStatus()); - transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); - transactionSendResult.setMsgId(sendResult.getMsgId()); - transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); - transactionSendResult.setTransactionId(sendResult.getTransactionId()); - transactionSendResult.setLocalTransactionState(localTransactionState); - return transactionSendResult; - } - - /** - * DEFAULT SYNC ------------------------------------------------------- - */ - public SendResult send( - Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, this.defaultMQProducer.getSendMsgTimeout()); - } - - public void endTransaction( - final SendResult sendResult, - final LocalTransactionState localTransactionState, - final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { - final MessageId id; - if (sendResult.getOffsetMsgId() != null) { - id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); - } else { - id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); - } - String transactionId = sendResult.getTransactionId(); - final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); - EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); - requestHeader.setTransactionId(transactionId); - requestHeader.setCommitLogOffset(id.getOffset()); - switch (localTransactionState) { - case COMMIT_MESSAGE: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); - break; - case ROLLBACK_MESSAGE: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); - break; - case UNKNOW: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); - break; - default: - break; - } - - requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); - requestHeader.setMsgId(sendResult.getMsgId()); - String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; - this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, - this.defaultMQProducer.getSendMsgTimeout()); - } - - public void setCallbackExecutor(final ExecutorService callbackExecutor) { - this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); - } - - public ExecutorService getAsyncSenderExecutor() { - return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor; - } - - public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { - this.asyncSenderExecutor = asyncSenderExecutor; - } - - public SendResult send(Message msg, - long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); - } - - public ConcurrentMap getTopicPublishInfoTable() { - return topicPublishInfoTable; - } - - public int getZipCompressLevel() { - return zipCompressLevel; - } - - public void setZipCompressLevel(int zipCompressLevel) { - this.zipCompressLevel = zipCompressLevel; - } - - public ServiceState getServiceState() { - return serviceState; - } - - public void setServiceState(ServiceState serviceState) { - this.serviceState = serviceState; - } - - public long[] getNotAvailableDuration() { - return this.mqFaultStrategy.getNotAvailableDuration(); - } - - public void setNotAvailableDuration(final long[] notAvailableDuration) { - this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration); - } - - public long[] getLatencyMax() { - return this.mqFaultStrategy.getLatencyMax(); - } - - public void setLatencyMax(final long[] latencyMax) { - this.mqFaultStrategy.setLatencyMax(latencyMax); - } - - public boolean isSendLatencyFaultEnable() { - return this.mqFaultStrategy.isSendLatencyFaultEnable(); - } - - public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { - this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); - } - - public DefaultMQProducer getDefaultMQProducer() { - return defaultMQProducer; - } + private final InternalLogger log = ClientLogger.getLog(); + private final Random random = new Random(); + private final DefaultMQProducer defaultMQProducer; + private final ConcurrentMap topicPublishInfoTable = + new ConcurrentHashMap(); + private final ArrayList sendMessageHookList = new ArrayList(); + private final RPCHook rpcHook; + protected BlockingQueue checkRequestQueue; + protected ExecutorService checkExecutor; + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private ArrayList checkForbiddenHookList = new ArrayList(); + private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); + + private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + + private final BlockingQueue asyncSenderThreadPoolQueue; + private final ExecutorService defaultAsyncSenderExecutor; + private ExecutorService asyncSenderExecutor; + + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { + this(defaultMQProducer, null); + } + + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { + this.defaultMQProducer = defaultMQProducer; + this.rpcHook = rpcHook; + + this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000); + this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.asyncSenderThreadPoolQueue, + new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); + } + }); + } + + public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { + this.checkForbiddenHookList.add(checkForbiddenHook); + log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), + checkForbiddenHookList.size()); + } + + public void initTransactionEnv() { + TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; + if (producer.getExecutorService() != null) { + this.checkExecutor = producer.getExecutorService(); + } else { + this.checkRequestQueue = new LinkedBlockingQueue(producer.getCheckRequestHoldMax()); + this.checkExecutor = new ThreadPoolExecutor( + producer.getCheckThreadPoolMinSize(), + producer.getCheckThreadPoolMaxSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.checkRequestQueue); + } + } + + public void destroyTransactionEnv() { + if (this.checkExecutor != null) { + this.checkExecutor.shutdown(); + } + } + + public void registerSendMessageHook(final SendMessageHook hook) { + this.sendMessageHookList.add(hook); + log.info("register sendMessage Hook, {}", hook.hookName()); + } + + public void start() throws MQClientException { + this.start(true); + } + + public void start(final boolean startFactory) throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { + this.defaultMQProducer.changeInstanceNameToPID(); + } + + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); + + boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + + this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); + + if (startFactory) { + mQClientFactory.start(); + } + + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), + this.defaultMQProducer.isSendMessageWithVIPChannel()); + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The producer service state not OK, maybe started once, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + + private void checkConfig() throws MQClientException { + Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); + + if (null == this.defaultMQProducer.getProducerGroup()) { + throw new MQClientException("producerGroup is null", null); + } + + if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { + throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", + null); + } + } + + public void shutdown() { + this.shutdown(true); + } + + public void shutdown(final boolean shutdownFactory) { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); + this.defaultAsyncSenderExecutor.shutdown(); + if (shutdownFactory) { + this.mQClientFactory.shutdown(); + } + + log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + @Override + public Set getPublishTopicList() { + Set topicList = new HashSet(); + for (String key : this.topicPublishInfoTable.keySet()) { + topicList.add(key); + } + + return topicList; + } + + @Override + public boolean isPublishTopicNeedUpdate(String topic) { + TopicPublishInfo prev = this.topicPublishInfoTable.get(topic); + + return null == prev || !prev.ok(); + } + + /** + * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * + * @return + */ + @Override + @Deprecated + public TransactionCheckListener checkListener() { + if (this.defaultMQProducer instanceof TransactionMQProducer) { + TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + return producer.getTransactionCheckListener(); + } + + return null; + } + + @Override + public TransactionListener getCheckListener() { + if (this.defaultMQProducer instanceof TransactionMQProducer) { + TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + return producer.getTransactionListener(); + } + return null; + } + + @Override + public void checkTransactionState(final String addr, final MessageExt msg, + final CheckTransactionStateRequestHeader header) { + Runnable request = new Runnable() { + private final String brokerAddr = addr; + private final MessageExt message = msg; + private final CheckTransactionStateRequestHeader checkRequestHeader = header; + private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); + + @Override + public void run() { + TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); + TransactionListener transactionListener = getCheckListener(); + if (transactionCheckListener != null || transactionListener != null) { + LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; + Throwable exception = null; + try { + if (transactionCheckListener != null) { + localTransactionState = transactionCheckListener.checkLocalTransactionState(message); + } else if (transactionListener != null) { + log.debug("Used new check API in transaction message"); + localTransactionState = transactionListener.checkLocalTransaction(message); + } else { + log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); + } + } catch (Throwable e) { + log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); + exception = e; + } + + this.processTransactionState( + localTransactionState, + group, + exception); + } else { + log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); + } + } + + private void processTransactionState( + final LocalTransactionState localTransactionState, + final String producerGroup, + final Throwable exception) { + final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); + thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); + thisHeader.setProducerGroup(producerGroup); + thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); + thisHeader.setFromTransactionCheck(true); + + String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (uniqueKey == null) { + uniqueKey = message.getMsgId(); + } + thisHeader.setMsgId(uniqueKey); + thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); + switch (localTransactionState) { + case COMMIT_MESSAGE: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); + break; + case ROLLBACK_MESSAGE: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); + log.warn("when broker check, client rollback this transaction, {}", thisHeader); + break; + case UNKNOW: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); + log.warn("when broker check, client does not know this transaction state, {}", thisHeader); + break; + default: + break; + } + + String remark = null; + if (exception != null) { + remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); + } + + try { + DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, + 3000); + } catch (Exception e) { + log.error("endTransactionOneway exception", e); + } + } + }; + + this.checkExecutor.submit(request); + } + + @Override + public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) { + if (info != null && topic != null) { + TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info); + if (prev != null) { + log.info("updateTopicPublishInfo prev is not null, " + prev.toString()); + } + } + } + + @Override + public boolean isUnitMode() { + return this.defaultMQProducer.isUnitMode(); + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.makeSureStateOK(); + Validators.checkTopic(newTopic); + + this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + private void makeSureStateOK() throws MQClientException { + if (this.serviceState != ServiceState.RUNNING) { + throw new MQClientException("The producer service state not OK, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + } + } + + public List fetchPublishMessageQueues(String topic) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + public long maxOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } + + public long minOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().minOffset(mq); + } + + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + public MessageExt viewMessage( + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.makeSureStateOK(); + + return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); + } + + /** + * DEFAULT ASYNC ------------------------------------------------------- + */ + public void send(Message msg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + /** + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * + * @param msg + * @param sendCallback + * @param timeout the sendCallback will be invoked at most time + * @throws RejectedExecutionException + */ + @Deprecated + public void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { + final long beginStartTime = System.currentTimeMillis(); + ExecutorService executor = this.getAsyncSenderExecutor(); + try { + executor.submit(new Runnable() { + @Override + public void run() { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout > costTime) { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); + } catch (Exception e) { + sendCallback.onException(e); + } + } else { + sendCallback.onException( + new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); + } + } + + }); + } catch (RejectedExecutionException e) { + throw new MQClientException("executor rejected ", e); + } + + } + + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { + return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); + } + + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { + this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); + } + + private void validateNameServerSetting() throws MQClientException { + List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); + if (null == nsList || nsList.isEmpty()) { + throw new MQClientException( + "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); + } + + } + + private SendResult sendDefaultImpl( + Message msg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final long timeout + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + final long invokeID = random.nextLong(); + long beginTimestampFirst = System.currentTimeMillis(); + long beginTimestampPrev = beginTimestampFirst; + long endTimestamp = beginTimestampFirst; + TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + if (topicPublishInfo != null && topicPublishInfo.ok()) { + boolean callTimeout = false; + MessageQueue mq = null; + Exception exception = null; + SendResult sendResult = null; + int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; + int times = 0; + String[] brokersSent = new String[timesTotal]; + for (; times < timesTotal; times++) { + String lastBrokerName = null == mq ? null : mq.getBrokerName(); + MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + if (mqSelected != null) { + mq = mqSelected; + brokersSent[times] = mq.getBrokerName(); + try { + beginTimestampPrev = System.currentTimeMillis(); + if (times > 0) { + //Reset topic with namespace during resend. + msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); + } + long costTime = beginTimestampPrev - beginTimestampFirst; + if (timeout < costTime) { + callTimeout = true; + break; + } + + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + switch (communicationMode) { + case ASYNC: + return null; + case ONEWAY: + return null; + case SYNC: + if (sendResult.getSendStatus() != SendStatus.SEND_OK) { + if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { + continue; + } + } + + return sendResult; + default: + break; + } + } catch (RemotingException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (MQClientException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (MQBrokerException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + switch (e.getResponseCode()) { + case ResponseCode.TOPIC_NOT_EXIST: + case ResponseCode.SERVICE_NOT_AVAILABLE: + case ResponseCode.SYSTEM_ERROR: + case ResponseCode.NO_PERMISSION: + case ResponseCode.NO_BUYER_ID: + case ResponseCode.NOT_IN_CURRENT_UNIT: + continue; + default: + if (sendResult != null) { + return sendResult; + } + + throw e; + } + } catch (InterruptedException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + + log.warn("sendKernelImpl exception", e); + log.warn(msg.toString()); + throw e; + } + } else { + break; + } + } + + if (sendResult != null) { + return sendResult; + } + + String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", + times, + System.currentTimeMillis() - beginTimestampFirst, + msg.getTopic(), + Arrays.toString(brokersSent)); + + info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); + + MQClientException mqClientException = new MQClientException(info, exception); + if (callTimeout) { + throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); + } + + if (exception instanceof MQBrokerException) { + mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); + } else if (exception instanceof RemotingConnectException) { + mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); + } else if (exception instanceof RemotingTimeoutException) { + mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); + } else if (exception instanceof MQClientException) { + mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); + } + + throw mqClientException; + } + + validateNameServerSetting(); + + throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), + null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); + } + + private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { + TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + } + + if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { + return topicPublishInfo; + } else { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + return topicPublishInfo; + } + } + + private SendResult sendKernelImpl(final Message msg, + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long beginStartTime = System.currentTimeMillis(); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); + if (null == brokerAddr) { + tryToFindTopicPublishInfo(mq.getTopic()); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); + } + + SendMessageContext context = null; + if (brokerAddr != null) { + brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); + + byte[] prevBody = msg.getBody(); + try { + //for MessageBatch,ID has been set in the generating process + if (!(msg instanceof MessageBatch)) { + MessageClientIDSetter.setUniqID(msg); + } + + boolean topicWithNamespace = false; + if (null != this.mQClientFactory.getClientConfig().getNamespace()) { + msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); + topicWithNamespace = true; + } + + int sysFlag = 0; + boolean msgBodyCompressed = false; + if (this.tryToCompressMessage(msg)) { + sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + msgBodyCompressed = true; + } + + final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { + sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; + } + + if (hasCheckForbiddenHook()) { + CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); + checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); + checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); + checkForbiddenContext.setCommunicationMode(communicationMode); + checkForbiddenContext.setBrokerAddr(brokerAddr); + checkForbiddenContext.setMessage(msg); + checkForbiddenContext.setMq(mq); + checkForbiddenContext.setUnitMode(this.isUnitMode()); + this.executeCheckForbiddenHook(checkForbiddenContext); + } + + if (this.hasSendMessageHook()) { + context = new SendMessageContext(); + context.setProducer(this); + context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + context.setCommunicationMode(communicationMode); + context.setBornHost(this.defaultMQProducer.getClientIP()); + context.setBrokerAddr(brokerAddr); + context.setMessage(msg); + context.setMq(mq); + context.setNamespace(this.defaultMQProducer.getNamespace()); + String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (isTrans != null && isTrans.equals("true")) { + context.setMsgType(MessageType.Trans_Msg_Half); + } + + if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { + context.setMsgType(MessageType.Delay_Msg); + } + this.executeSendMessageHookBefore(context); + } + + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + requestHeader.setTopic(msg.getTopic()); + requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); + requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setSysFlag(sysFlag); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(msg.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + requestHeader.setReconsumeTimes(0); + requestHeader.setUnitMode(this.isUnitMode()); + requestHeader.setBatch(msg instanceof MessageBatch); + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); + if (reconsumeTimes != null) { + requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); + } + + String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); + if (maxReconsumeTimes != null) { + requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); + } + } + + SendResult sendResult = null; + switch (communicationMode) { + case ASYNC: + Message tmpMessage = msg; + boolean messageCloned = false; + if (msgBodyCompressed) { + //If msg body was compressed, msgbody should be reset using prevBody. + //Clone new message using commpressed message body and recover origin massage. + //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 + tmpMessage = MessageAccessor.cloneMessage(msg); + messageCloned = true; + msg.setBody(prevBody); + } + + if (topicWithNamespace) { + if (!messageCloned) { + tmpMessage = MessageAccessor.cloneMessage(msg); + messageCloned = true; + } + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); + } + + long costTimeAsync = System.currentTimeMillis() - beginStartTime; + if (timeout < costTimeAsync) { + throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); + } + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + tmpMessage, + requestHeader, + timeout - costTimeAsync, + communicationMode, + sendCallback, + topicPublishInfo, + this.mQClientFactory, + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), + context, + this); + break; + case ONEWAY: + case SYNC: + long costTimeSync = System.currentTimeMillis() - beginStartTime; + if (timeout < costTimeSync) { + throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); + } + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout - costTimeSync, + communicationMode, + context, + this); + break; + default: + assert false; + break; + } + + if (this.hasSendMessageHook()) { + context.setSendResult(sendResult); + this.executeSendMessageHookAfter(context); + } + + return sendResult; + } catch (RemotingException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (MQBrokerException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (InterruptedException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } finally { + msg.setBody(prevBody); + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); + } + } + + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + + public MQClientInstance getmQClientFactory() { + return mQClientFactory; + } + + private boolean tryToCompressMessage(final Message msg) { + if (msg instanceof MessageBatch) { + //batch dose not support compressing right now + return false; + } + byte[] body = msg.getBody(); + if (body != null) { + if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { + try { + byte[] data = UtilAll.compress(body, zipCompressLevel); + if (data != null) { + msg.setBody(data); + return true; + } + } catch (IOException e) { + log.error("tryToCompressMessage exception", e); + log.warn(msg.toString()); + } + } + } + + return false; + } + + public boolean hasCheckForbiddenHook() { + return !checkForbiddenHookList.isEmpty(); + } + + public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException { + if (hasCheckForbiddenHook()) { + for (CheckForbiddenHook hook : checkForbiddenHookList) { + hook.checkForbidden(context); + } + } + } + + public boolean hasSendMessageHook() { + return !this.sendMessageHookList.isEmpty(); + } + + public void executeSendMessageHookBefore(final SendMessageContext context) { + if (!this.sendMessageHookList.isEmpty()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + hook.sendMessageBefore(context); + } catch (Throwable e) { + log.warn("failed to executeSendMessageHookBefore", e); + } + } + } + } + + public void executeSendMessageHookAfter(final SendMessageContext context) { + if (!this.sendMessageHookList.isEmpty()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + hook.sendMessageAfter(context); + } catch (Throwable e) { + log.warn("failed to executeSendMessageHookAfter", e); + } + } + } + } + + /** + * DEFAULT ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + try { + this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + /** + * KERNEL SYNC ------------------------------------------------------- + */ + public SendResult send(Message msg, MessageQueue mq) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); + } + + public SendResult send(Message msg, MessageQueue mq, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long beginStartTime = System.currentTimeMillis(); + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout < costTime) { + throw new RemotingTooMuchRequestException("call timeout"); + } + + return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); + } + + /** + * KERNEL ASYNC ------------------------------------------------------- + */ + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + /** + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * + * @param msg + * @param mq + * @param sendCallback + * @param timeout the sendCallback will be invoked at most time + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + @Deprecated + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { + final long beginStartTime = System.currentTimeMillis(); + ExecutorService executor = this.getAsyncSenderExecutor(); + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + makeSureStateOK(); + Validators.checkMessage(msg, defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout > costTime) { + try { + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, + timeout - costTime); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } else { + sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); + } + } catch (Exception e) { + sendCallback.onException(e); + } + + } + + }); + } catch (RejectedExecutionException e) { + throw new MQClientException("executor rejected ", e); + } + + } + + /** + * KERNEL ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg, + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + try { + this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + /** + * SELECT SYNC ------------------------------------------------------- + */ + public SendResult send(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); + } + + public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); + } + + private SendResult sendSelectImpl( + Message msg, + MessageQueueSelector selector, + Object arg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, final long timeout + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long beginStartTime = System.currentTimeMillis(); + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + if (topicPublishInfo != null && topicPublishInfo.ok()) { + MessageQueue mq = null; + try { + List messageQueueList = + mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); + Message userMessage = MessageAccessor.cloneMessage(msg); + String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); + userMessage.setTopic(userTopic); + + mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); + } catch (Throwable e) { + throw new MQClientException("select message queue throwed exception.", e); + } + + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout < costTime) { + throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); + } + if (mq != null) { + return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); + } else { + throw new MQClientException("select message queue return null.", null); + } + } + + validateNameServerSetting(); + throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); + } + + /** + * SELECT ASYNC ------------------------------------------------------- + */ + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + /** + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * + * @param msg + * @param selector + * @param arg + * @param sendCallback + * @param timeout the sendCallback will be invoked at most time + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + @Deprecated + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { + final long beginStartTime = System.currentTimeMillis(); + ExecutorService executor = this.getAsyncSenderExecutor(); + try { + executor.submit(new Runnable() { + @Override + public void run() { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout > costTime) { + try { + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, + timeout - costTime); + } catch (MQBrokerException e) { + throw new MQClientException("unknownn exception", e); + } + } catch (Exception e) { + sendCallback.onException(e); + } + } else { + sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); + } + } + + }); + } catch (RejectedExecutionException e) { + throw new MQClientException("exector rejected ", e); + } + } + + /** + * SELECT ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, InterruptedException { + try { + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter localTransactionExecuter, final Object arg) + throws MQClientException { + TransactionListener transactionListener = getCheckListener(); + if (null == localTransactionExecuter && null == transactionListener) { + throw new MQClientException("tranExecutor is null", null); + } + + // ignore DelayTimeLevel parameter + if (msg.getDelayTimeLevel() != 0) { + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + } + + Validators.checkMessage(msg, this.defaultMQProducer); + + SendResult sendResult = null; + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); + try { + sendResult = this.send(msg); + } catch (Exception e) { + throw new MQClientException("send message Exception", e); + } + + LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; + Throwable localException = null; + switch (sendResult.getSendStatus()) { + case SEND_OK: { + try { + if (sendResult.getTransactionId() != null) { + msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); + } + String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (null != transactionId && !"".equals(transactionId)) { + msg.setTransactionId(transactionId); + } + if (null != localTransactionExecuter) { + localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); + } else if (transactionListener != null) { + log.debug("Used new transaction API"); + localTransactionState = transactionListener.executeLocalTransaction(msg, arg); + } + if (null == localTransactionState) { + localTransactionState = LocalTransactionState.UNKNOW; + } + + if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { + log.info("executeLocalTransactionBranch return {}", localTransactionState); + log.info(msg.toString()); + } + } catch (Throwable e) { + log.info("executeLocalTransactionBranch exception", e); + log.info(msg.toString()); + localException = e; + } + } + break; + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; + break; + default: + break; + } + + try { + this.endTransaction(sendResult, localTransactionState, localException); + } catch (Exception e) { + log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); + } + + TransactionSendResult transactionSendResult = new TransactionSendResult(); + transactionSendResult.setSendStatus(sendResult.getSendStatus()); + transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); + transactionSendResult.setMsgId(sendResult.getMsgId()); + transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); + transactionSendResult.setTransactionId(sendResult.getTransactionId()); + transactionSendResult.setLocalTransactionState(localTransactionState); + return transactionSendResult; + } + + /** + * DEFAULT SYNC ------------------------------------------------------- + */ + public SendResult send( + Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, this.defaultMQProducer.getSendMsgTimeout()); + } + + public void endTransaction( + final SendResult sendResult, + final LocalTransactionState localTransactionState, + final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { + final MessageId id; + if (sendResult.getOffsetMsgId() != null) { + id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); + } else { + id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); + } + String transactionId = sendResult.getTransactionId(); + final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); + EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); + requestHeader.setTransactionId(transactionId); + requestHeader.setCommitLogOffset(id.getOffset()); + switch (localTransactionState) { + case COMMIT_MESSAGE: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); + break; + case ROLLBACK_MESSAGE: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); + break; + case UNKNOW: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); + break; + default: + break; + } + + requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); + requestHeader.setMsgId(sendResult.getMsgId()); + String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; + this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, + this.defaultMQProducer.getSendMsgTimeout()); + } + + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); + } + + public ExecutorService getAsyncSenderExecutor() { + return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor; + } + + public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { + this.asyncSenderExecutor = asyncSenderExecutor; + } + + public SendResult send(Message msg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); + } + + public ConcurrentMap getTopicPublishInfoTable() { + return topicPublishInfoTable; + } + + public int getZipCompressLevel() { + return zipCompressLevel; + } + + public void setZipCompressLevel(int zipCompressLevel) { + this.zipCompressLevel = zipCompressLevel; + } + + public ServiceState getServiceState() { + return serviceState; + } + + public void setServiceState(ServiceState serviceState) { + this.serviceState = serviceState; + } + + public long[] getNotAvailableDuration() { + return this.mqFaultStrategy.getNotAvailableDuration(); + } + + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration); + } + + public long[] getLatencyMax() { + return this.mqFaultStrategy.getLatencyMax(); + } + + public void setLatencyMax(final long[] latencyMax) { + this.mqFaultStrategy.setLatencyMax(latencyMax); + } + + public boolean isSendLatencyFaultEnable() { + return this.mqFaultStrategy.isSendLatencyFaultEnable(); + } + + public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { + this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); + } + + public DefaultMQProducer getDefaultMQProducer() { + return defaultMQProducer; + } }