From 46b2e1250b7d728b3a2212624603e785c578bdf1 Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Tue, 27 Aug 2019 21:16:31 +0800 Subject: [PATCH] [RIP-16]impl rpc support - code format --- .../rocketmq/broker/BrokerController.java | 37 ++++------ .../broker/client/ProducerManager.java | 3 +- .../processor/ReplyMessageProcessor.java | 70 ++++++++++++------ .../broker/topic/TopicConfigManager.java | 4 +- .../exception/RequestTimeoutException.java | 20 +++++- .../client/impl/ClientRemotingProcessor.java | 14 ++-- .../rocketmq/client/impl/MQClientAPIImpl.java | 12 ++-- .../impl/producer/DefaultMQProducerImpl.java | 69 +++++++++--------- .../client/producer/DefaultMQProducer.java | 72 +++++++------------ .../rocketmq/client/producer/MQProducer.java | 17 ++--- .../client/producer/RequestCallback.java | 17 +++++ .../client/producer/RequestFutureTable.java | 30 ++++++-- .../producer/RequestResponseFuture.java | 34 ++++++--- .../rocketmq/client/utils/MessageUtil.java | 17 +++++ .../apache/rocketmq/common/BrokerConfig.java | 2 +- .../org/apache/rocketmq/common/MixAll.java | 8 +-- .../header/ReplyMessageRequestHeader.java | 17 +++++ .../rocketmq/common/utils/RequestIdUtil.java | 19 ++++- 18 files changed, 284 insertions(+), 178 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 84b3e024..8739d9fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -61,8 +61,8 @@ import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; import org.apache.rocketmq.broker.processor.EndTransactionProcessor; import org.apache.rocketmq.broker.processor.PullMessageProcessor; import org.apache.rocketmq.broker.processor.QueryMessageProcessor; -import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.processor.ReplyMessageProcessor; +import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; @@ -166,7 +166,7 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private Future slaveSyncFuture; - private Map accessValidatorMap = new HashMap(); + private Map accessValidatorMap = new HashMap(); public BrokerController( final BrokerConfig brokerConfig, @@ -245,7 +245,7 @@ public class BrokerController { this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); - ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); + ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin @@ -282,12 +282,12 @@ public class BrokerController { new ThreadFactoryImpl("PullMessageThread_")); this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.replyThreadPoolQueue, - new ThreadFactoryImpl("ProcessReplyMessageThread_")); + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.replyThreadPoolQueue, + new ThreadFactoryImpl("ProcessReplyMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), @@ -513,9 +513,9 @@ public class BrokerController { return; } - for (AccessValidator accessValidator: accessValidators) { + for (AccessValidator accessValidator : accessValidators) { final AccessValidator validator = accessValidator; - accessValidatorMap.put(validator.getClass(),validator); + accessValidatorMap.put(validator.getClass(), validator); this.registerServerRPCHook(new RPCHook() { @Override @@ -531,14 +531,13 @@ public class BrokerController { } } - private void initialRpcHooks() { List rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class); if (rpcHooks == null || rpcHooks.isEmpty()) { return; } - for (RPCHook rpcHook: rpcHooks) { + for (RPCHook rpcHook : rpcHooks) { this.registerServerRPCHook(rpcHook); } } @@ -576,7 +575,6 @@ public class BrokerController { this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); - /** * QueryMessageProcessor */ @@ -887,8 +885,6 @@ public class BrokerController { handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); } - - this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -911,7 +907,6 @@ public class BrokerController { this.brokerFastFailure.start(); } - } public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { @@ -1125,7 +1120,6 @@ public class BrokerController { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } - public BlockingQueue getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; @@ -1146,8 +1140,7 @@ public class BrokerController { public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); - } - catch (Throwable e) { + } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } @@ -1193,8 +1186,6 @@ public class BrokerController { log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); } - - public void changeToMaster(BrokerRole role) { if (role == BrokerRole.SLAVE) { return; @@ -1244,6 +1235,4 @@ public class BrokerController { } } - - } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index db372fa8..bd007b05 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; - import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -28,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -46,6 +44,7 @@ public class ProducerManager { new HashMap>(); private final ConcurrentHashMap clientChannelTable = new ConcurrentHashMap<>(); private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); + public ProducerManager() { } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index 00960d79..23e1e9ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -1,11 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; -import org.apache.rocketmq.common.*; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; @@ -16,7 +38,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.store.*; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { @@ -26,7 +49,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext = null; SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { @@ -49,13 +73,13 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen switch (request.getCode()) { case RequestCode.SEND_REPLY_MESSAGE_V2: requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + (SendMessageRequestHeaderV2) request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_REPLY_MESSAGE: if (null == requestHeaderV2) { requestHeader = - (SendMessageRequestHeader) request - .decodeCommandCustomHeader(SendMessageRequestHeader.class); + (SendMessageRequestHeader) request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } @@ -66,11 +90,11 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen } private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx, - final RemotingCommand request, - final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) { + final RemotingCommand request, + final SendMessageContext sendMessageContext, + final SendMessageRequestHeader requestHeader) { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); response.setOpaque(request.getOpaque()); @@ -126,7 +150,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen return response; } - private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final Message msg, final RemotingCommand response) { + private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, + final Message msg, final RemotingCommand response) { ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader(); replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString()); replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString()); @@ -194,12 +219,13 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen } private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand response, - final RemotingCommand request, final MessageExt msg, - final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, - int queueIdInt) { + final RemotingCommand request, final MessageExt msg, + final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, + ChannelHandlerContext ctx, + int queueIdInt) { if (putMessageResult == null) { response.setRemark("push reply to requester success, but store putMessage return null"); - return ; + return; } boolean sendOK = false; @@ -231,12 +257,12 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen case PROPERTIES_SIZE_EXCEEDED: // response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: // response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( - "service not available now, maybe disk full, maybe your broker machine memory too small."); + "service not available now, maybe disk full, maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: // response.setCode(ResponseCode.SYSTEM_ERROR); @@ -256,7 +282,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen if (sendOK) { this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); + putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); response.setRemark(null); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); @@ -270,7 +296,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); @@ -280,7 +306,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; - int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 577fff57..cb290111 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -34,11 +34,11 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java index fd338d60..2d756ece 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java @@ -1,7 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.client.exception; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.help.FAQUrl; public class RequestTimeoutException extends Exception { private static final long serialVersionUID = -5758410930844185841L; @@ -16,7 +32,7 @@ public class RequestTimeoutException extends Exception { public RequestTimeoutException(int responseCode, String errorMessage) { super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage); + + errorMessage); this.responseCode = responseCode; this.errorMessage = errorMessage; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 7a17a043..e9072df3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -16,13 +16,12 @@ */ package org.apache.rocketmq.client.impl; +import io.netty.channel.ChannelHandlerContext; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; - -import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; @@ -30,7 +29,11 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.RequestFutureTable; import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -43,8 +46,8 @@ import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRe import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; -import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -52,7 +55,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; - public class ClientRemotingProcessor implements NettyRequestProcessor { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; @@ -221,7 +223,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { } private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); long receiveTime = System.currentTimeMillis(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index f780adf9..d478d741 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; @@ -304,8 +303,8 @@ public class MQClientAPIImpl { requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); - requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),",")); - requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),",")); + requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ",")); + requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ",")); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); @@ -344,7 +343,7 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis) + public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader(); @@ -366,7 +365,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, + public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr, + final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null); @@ -389,7 +389,7 @@ public class MQClientAPIImpl { } throw new MQBrokerException(response.getCode(), response.getRemark()); - + } public SendResult sendMessage( diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7a0bc4cd..6e15ed1a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -103,19 +103,17 @@ public class DefaultMQProducerImpl implements MQProducerInner { new ConcurrentHashMap(); private final ArrayList sendMessageHookList = new ArrayList(); private final RPCHook rpcHook; + private final BlockingQueue asyncSenderThreadPoolQueue; + private final ExecutorService defaultAsyncSenderExecutor; + private final Timer timer = new Timer("RequestHouseKeepingService", true); protected BlockingQueue checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList checkForbiddenHookList = new ArrayList(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); - private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); - - private final BlockingQueue asyncSenderThreadPoolQueue; - private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; - private final Timer timer = new Timer("RequestHouseKeepingService", true); public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); @@ -291,6 +289,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * * @return */ @Override @@ -484,13 +483,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { * DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param sendCallback * @param timeout the sendCallback will be invoked at most time @@ -525,7 +525,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { } - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } @@ -701,11 +700,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { @@ -1010,8 +1009,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param mq * @param sendCallback @@ -1137,8 +1137,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param selector * @param arg @@ -1149,7 +1150,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { * @throws InterruptedException */ @Deprecated - public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this.getAsyncSenderExecutor(); @@ -1193,7 +1195,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter localTransactionExecuter, final Object arg) + final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { @@ -1330,7 +1332,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } - public Message request(Message msg, long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { + public Message request(Message msg, + long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); @@ -1356,7 +1359,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (responseMessage == null) { if (requestResponseFuture.isSendReqeustOk()) { throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); } else { throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); } @@ -1394,8 +1397,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException, RequestTimeoutException { + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException, RequestTimeoutException { prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); @@ -1421,7 +1424,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (responseMessage == null) { if (requestResponseFuture.isSendReqeustOk()) { throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); } else { throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); } @@ -1433,7 +1436,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void request(final Message msg, final MessageQueueSelector selector, final Object arg, - final RequestCallback requestCallback, final long timeout) throws RemotingException { + final RequestCallback requestCallback, final long timeout) throws RemotingException { prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); @@ -1460,7 +1463,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public Message request(final Message msg, final MessageQueue mq, final long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); @@ -1486,7 +1489,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (responseMessage == null) { if (requestResponseFuture.isSendReqeustOk()) { throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); } else { throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); } @@ -1498,7 +1501,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws RemotingException { + throws RemotingException { prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); @@ -1537,7 +1540,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - private void prepareSendRequest(final Message msg, long timeout){ + private void prepareSendRequest(final Message msg, long timeout) { String requestUniqId = RequestIdUtil.createUniqueRequestId(); String requestClientId = this.getmQClientFactory().getClientId(); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); @@ -1556,11 +1559,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - private SendResult reply(final Message msg, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + private SendResult reply(final Message msg, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeoutMillis); } - private SendResult reply(final Message msg, final SendCallback sendCallback, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + private SendResult reply(final Message msg, final SendCallback sendCallback, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { return this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeoutMillis); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 4789c4d3..bb017ef2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -43,38 +43,29 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; /** - * This class is the entry point for applications intending to send messages. - *

+ * This class is the entry point for applications intending to send messages.

* * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of - * box for most scenarios. - *

+ * box for most scenarios.

* * This class aggregates various send methods to deliver messages to brokers. Each of them has pros and - * cons; you'd better understand strengths and weakness of them before actually coding. - *

+ * cons; you'd better understand strengths and weakness of them before actually coding.

* - *

- * Thread Safety: After configuring and starting process, this class can be regarded as thread-safe - * and used among multiple threads context. - *

+ *

Thread Safety: After configuring and starting process, this class can be regarded as thread-safe + * and used among multiple threads context.

*/ public class DefaultMQProducer extends ClientConfig implements MQProducer { - private final InternalLogger log = ClientLogger.getLog(); - /** * Wrapping internal implementations for virtually all methods presented in this class. */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; - + private final InternalLogger log = ClientLogger.getLog(); /** * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly - * important when transactional messages are involved. - *

+ * important when transactional messages are involved.

* - * For non-transactional messages, it does not matter as long as it's unique per process. - *

+ * For non-transactional messages, it does not matter as long as it's unique per process.

* * See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion. */ @@ -101,16 +92,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { private int compressMsgBodyOverHowmuch = 1024 * 4; /** - * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. - *

+ * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.

* * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendFailed = 2; /** - * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. - *

+ * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.

* * This may potentially cause message duplication which is up to application developers to resolve. */ @@ -269,14 +258,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Start this producer instance. - *

+ * Start this producer instance.

* - * - * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke - * this method before sending or querying messages. - * - *

+ * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must + * to invoke this method before sending or querying messages.

* * @throws MQClientException if there is any unexpected error. */ @@ -317,8 +302,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Send message in synchronous mode. This method returns only when the sending procedure totally completes. - *

+ * Send message in synchronous mode. This method returns only when the sending procedure totally completes.

* * Warn: this method has internal retry-mechanism, that is, internal implementation will retry * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially @@ -360,11 +344,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Send message to broker asynchronously. - *

+ * Send message to broker asynchronously.

* - * This method returns immediately. On sending completion, sendCallback will be executed. - *

+ * This method returns immediately. On sending completion, sendCallback will be executed.

* * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and @@ -584,7 +566,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * * @param msg * @param timeout * @return @@ -595,13 +576,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, - RemotingException, MQBrokerException, InterruptedException { + RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.request(msg, timeout); } /** - * * @param msg * @param requestCallback * @param timeout @@ -612,12 +592,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public void request(final Message msg, final RequestCallback requestCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { + throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { this.defaultMQProducerImpl.request(msg, requestCallback, timeout); } /** - * * @param msg * @param selector * @param arg @@ -630,13 +609,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException, RequestTimeoutException { + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException, RequestTimeoutException { return this.defaultMQProducerImpl.request(msg, selector, arg, timeout); } /** - * * @param msg * @param selector * @param arg @@ -649,13 +627,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public void request(final Message msg, final MessageQueueSelector selector, final Object arg, - final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, - InterruptedException { + final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException { this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout); } /** - * * @param msg * @param mq * @param timeout @@ -667,12 +644,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public Message request(final Message msg, final MessageQueue mq, final long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { return this.defaultMQProducerImpl.request(msg, mq, timeout); } /** - * * @param msg * @param mq * @param requestCallback @@ -684,7 +660,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index ce8d4a50..a837c3d2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -102,22 +102,23 @@ public interface MQProducer extends MQAdmin { //for rpc Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, - RemotingException, MQBrokerException, InterruptedException; + RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final RequestCallback requestCallback, final long timeout) - throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException; + throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException; Message request(final Message msg, final MessageQueueSelector selector, final Object arg, - final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, - InterruptedException; + final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, + InterruptedException; void request(final Message msg, final MessageQueueSelector selector, final Object arg, - final RequestCallback requestCallback, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, - InterruptedException; + final RequestCallback requestCallback, + final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, + InterruptedException; Message request(final Message msg, final MessageQueue mq, final long timeout) - throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; + throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException; + throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java index 4a1874f9..3107ba57 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.client.producer; import org.apache.rocketmq.common.message.Message; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java index 74195be3..6ce53119 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java @@ -1,25 +1,41 @@ -package org.apache.rocketmq.client.producer; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import org.apache.rocketmq.client.common.ClientErrorCode; -import org.apache.rocketmq.client.exception.RequestTimeoutException; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; +package org.apache.rocketmq.client.producer; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.common.ClientErrorCode; +import org.apache.rocketmq.client.exception.RequestTimeoutException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.logging.InternalLogger; public class RequestFutureTable { private static InternalLogger log = ClientLogger.getLog(); private static ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); - public static ConcurrentHashMap getRequestFutureTable(){ + public static ConcurrentHashMap getRequestFutureTable() { return requestFutureTable; } - public static void scanExpiredRequest(){ + public static void scanExpiredRequest() { final List rfList = new LinkedList(); Iterator> it = requestFutureTable.entrySet().iterator(); while (it.hasNext()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java index a5c5feeb..d65b4794 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -1,32 +1,46 @@ -package org.apache.rocketmq.client.producer; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import org.apache.rocketmq.common.message.Message; +package org.apache.rocketmq.client.producer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.message.Message; public class RequestResponseFuture { private final String requestUniqId; - private long timeoutMillis; private final RequestCallback requestCallback; private final long beginTimestamp = System.currentTimeMillis(); + private final Message requestMsg = null; + private long timeoutMillis; private CountDownLatch countDownLatch = new CountDownLatch(1); - private AtomicBoolean ececuteCallbackOnlyOnce = new AtomicBoolean(false); private volatile Message responseMsg = null; private volatile boolean sendReqeustOk = true; private volatile Throwable cause = null; - private final Message requestMsg = null; - - public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback){ + public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback) { this.requestUniqId = requestUniqId; this.timeoutMillis = timeoutMillis; this.requestCallback = requestCallback; } - public void executeRequestCallback(){ + public void executeRequestCallback() { if (requestCallback != null) { if (sendReqeustOk && cause == null) { requestCallback.onSuccess(responseMsg); @@ -36,7 +50,7 @@ public class RequestResponseFuture { } } - public boolean isTimeout(){ + public boolean isTimeout() { long diff = System.currentTimeMillis() - this.beginTimestamp; return diff > this.timeoutMillis; } @@ -46,7 +60,7 @@ public class RequestResponseFuture { return this.responseMsg; } - public void putResponseMessage(final Message responseMsg){ + public void putResponseMessage(final Message responseMsg) { this.responseMsg = responseMsg; this.countDownLatch.countDown(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java index ce417752..62dd36dd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java +++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.client.utils; import org.apache.rocketmq.common.MixAll; diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index c3613e7d..a7568f0a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -769,7 +769,7 @@ public class BrokerConfig { public void setMsgTraceTopicName(String msgTraceTopicName) { this.msgTraceTopicName = msgTraceTopicName; } - + public boolean isTraceTopicEnable() { return traceTopicEnable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index a6151e55..0f7f0aa7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -45,8 +45,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; public class MixAll { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"; public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; @@ -74,30 +72,26 @@ public class MixAll { public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL"; public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_"; - public static final List LOCAL_INET_ADDRESS = getLocalInetAddress(); public static final String LOCALHOST = localhost(); public static final String DEFAULT_CHARSET = "UTF-8"; public static final long MASTER_ID = 0L; public static final long CURRENT_JVM_PID = getPID(); - public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; - public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC"; public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; - public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; - public static final String REPLY_MESSAGE_FLAG = "reply"; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java index 465baeae..3bb09073 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.common.protocol.header; import org.apache.rocketmq.remoting.CommandCustomHeader; diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java index 9a58e5fd..86c5096d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java @@ -1,9 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.common.utils; import java.util.UUID; public class RequestIdUtil { - public static String createUniqueRequestId(){ + public static String createUniqueRequestId() { return UUID.randomUUID().toString(); } } -- GitLab