提交 97aa813e 编写于 作者: Y yukon

Reformat code globally second time

上级 7f96008c
......@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker;
import java.io.File;
public class BrokerPathConfigHelper {
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "config" + File.separator + "broker.properties";
......
......@@ -122,7 +122,8 @@ public class ConsumerManager {
return r1 || r2;
}
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
......
......@@ -54,9 +54,6 @@ public class ConsumerFilterData {
/**
* Check this filter data has been used to calculate bit map when msg was stored in server.
*
* @param msgStoreTime
* @return
*/
public boolean isMsgInLive(long msgStoreTime) {
return msgStoreTime > getBornTime();
......
......@@ -72,16 +72,11 @@ public class ConsumerFilterManager extends ConfigManager {
/**
* Build consumer filter data.Be care, bloom filter data is not included.
*
* @param topic
* @param consumerGroup
* @param expression
* @param type
* @param clientVersion
* @return maybe null
*/
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
......@@ -140,7 +135,7 @@ public class ConsumerFilterManager extends ConfigManager {
}
public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
final String type, final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return false;
}
......@@ -357,7 +352,8 @@ public class ConsumerFilterManager extends ConfigManager {
data.setDeadTime(now);
}
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) {
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,
long clientVersion) {
ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
if (old == null) {
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.filter;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -32,7 +31,8 @@ import java.util.Map;
* <br>It will decode properties first in order to get real topic.
*/
public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) {
public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
super(subscriptionData, consumerFilterData, consumerFilterManager);
}
......
......@@ -41,7 +41,7 @@ public class ExpressionMessageFilter implements MessageFilter {
protected final boolean bloomDataValid;
public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
ConsumerFilterManager consumerFilterManager) {
this.subscriptionData = subscriptionData;
this.consumerFilterData = consumerFilterData;
this.consumerFilterManager = consumerFilterManager;
......
......@@ -111,7 +111,6 @@ public class FilterServerManager {
}
}
public void scanNotActiveChannel() {
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
......
......@@ -103,6 +103,7 @@ public class BrokerFastFailure {
}
}
}
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
......
......@@ -25,23 +25,28 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
......
......@@ -30,7 +30,7 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}
......
......@@ -114,7 +114,7 @@ public class PullRequestHoldService extends ServiceThread {
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
......
......@@ -118,7 +118,8 @@ public class ConsumerOffsetManager extends ConfigManager {
return groups;
}
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
......
......@@ -231,7 +231,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
......@@ -248,7 +249,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
......@@ -264,7 +266,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
public String getAllDelayOffset(
final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
......@@ -280,7 +283,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
......
......@@ -28,7 +28,9 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
private final ByteBuffer byteBufferHeader;
private final GetMessageResult getMessageResult;
/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;
public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
......
......@@ -27,7 +27,9 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
private final ByteBuffer byteBufferHeader;
private final SelectMappedBufferResult selectMappedBufferResult;
/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;
public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
......
......@@ -28,7 +28,9 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
private final ByteBuffer byteBufferHeader;
private final QueryMessageResult queryMessageResult;
/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;
public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
......
......@@ -87,7 +87,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
@Override
public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
int maxMsgNums, final MessageFilter messageFilter) {
int maxMsgNums, final MessageFilter messageFilter) {
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
......
......@@ -116,7 +116,6 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......@@ -126,7 +125,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
......@@ -212,7 +212,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return false;
}
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
......@@ -249,7 +250,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return null;
}
private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
......@@ -355,7 +357,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
final SearchOffsetRequestHeader requestHeader =
......@@ -371,7 +374,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
final GetMaxOffsetRequestHeader requestHeader =
......@@ -386,7 +390,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
......@@ -400,7 +405,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
final GetEarliestMsgStoretimeRequestHeader requestHeader =
......@@ -429,7 +435,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
......@@ -447,7 +454,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
......@@ -477,7 +485,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
String content = this.brokerController.getSubscriptionGroupManager().encode();
if (content != null && content.length() > 0) {
......@@ -503,7 +512,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteSubscriptionGroupRequestHeader requestHeader =
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
......@@ -517,7 +527,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicStatsInfoRequestHeader requestHeader =
(GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
......@@ -565,7 +576,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerConnectionListRequestHeader requestHeader =
(GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
......@@ -604,7 +616,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetProducerConnectionListRequestHeader requestHeader =
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
......@@ -637,7 +650,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumeStatsRequestHeader requestHeader =
(GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
......@@ -658,7 +672,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
......@@ -770,7 +783,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
......@@ -787,7 +801,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetConsumerStatusRequestHeader requestHeader =
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
......@@ -798,7 +813,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
requestHeader.getClientAddr());
}
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryTopicConsumeByWhoRequestHeader requestHeader =
(QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
......@@ -820,7 +836,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
final RegisterFilterServerRequestHeader requestHeader =
......@@ -836,7 +853,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryConsumeTimeSpanRequestHeader requestHeader =
(QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
......@@ -924,8 +942,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
......@@ -933,7 +951,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
requestHeader.getClientId());
}
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryCorrectionOffsetHeader requestHeader =
(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
......@@ -960,7 +979,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
......@@ -984,7 +1004,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
requestHeader.getClientId());
}
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
......@@ -1004,7 +1025,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
......@@ -1025,7 +1045,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
......@@ -1249,7 +1270,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
}
private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
QueryConsumeQueueRequestHeader requestHeader =
(QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);
......
......@@ -46,7 +46,8 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
......
......@@ -76,7 +76,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
......@@ -138,7 +139,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
......@@ -286,11 +287,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
......@@ -314,8 +315,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
......@@ -437,15 +438,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
break;
......@@ -483,7 +484,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
}
private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) {
private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
final int queueId) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
long storeTimestamp = 0;
......@@ -528,7 +530,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
}
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
......
......@@ -60,7 +60,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
......@@ -245,8 +246,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response;
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request,
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
......@@ -319,8 +320,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
......@@ -361,13 +360,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) {
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
return response;
}
boolean sendOK = false;
......@@ -462,17 +462,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext.setCommercialOwner(owner);
}
}
return response;
return response;
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
......@@ -493,7 +493,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response;
}
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
......@@ -509,7 +508,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
return response;
}
MessageExtBatch messageExtBatch = new MessageExtBatch();
......
......@@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager {
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig()
.getStorePathRootDir());
.getStorePathRootDir());
}
@Override
......
......@@ -272,7 +272,7 @@ public class TopicConfigManager extends ConfigManager {
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
......@@ -292,7 +292,7 @@ public class TopicConfigManager extends ConfigManager {
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
......@@ -384,7 +384,7 @@ public class TopicConfigManager extends ConfigManager {
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
.getStorePathRootDir());
.getStorePathRootDir());
}
@Override
......
......@@ -50,7 +50,7 @@ public class BrokerControllerTest {
}
@After
public void destory(){
public void destory() {
UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir()));
}
}
......@@ -51,7 +51,6 @@ public class CommitLogDispatcherCalcBitMapTest {
ConsumerFilterData nullBloomData = filterManager.get("topic0", "CID_1");
nullBloomData.setBloomFilterData(null);
CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
filterManager);
......
......@@ -97,7 +97,7 @@ public class MessageStoreWithFilterTest {
}
public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
boolean enableCqExt, int cqExtFileSize) {
boolean enableCqExt, int cqExtFileSize) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
......@@ -127,7 +127,7 @@ public class MessageStoreWithFilterTest {
new MessageArrivingListener() {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// System.out.println(String.format("Msg coming: %s, %d, %d, %d",
// topic, queueId, logicOffset, tagsCode));
}
......@@ -154,7 +154,8 @@ public class MessageStoreWithFilterTest {
return master;
}
protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception {
protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
int msgCountPerTopic) throws Exception {
List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
......
......@@ -142,15 +142,18 @@ public class PullMessageProcessorTest {
List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
@Override public String hookName() {
@Override
public String hookName() {
return "TestHook";
}
@Override public void consumeMessageBefore(ConsumeMessageContext context) {
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
messageContext[0] = context;
}
@Override public void consumeMessageAfter(ConsumeMessageContext context) {
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
}
};
consumeMessageHookList.add(consumeMessageHook);
......
......@@ -94,15 +94,18 @@ public class SendMessageProcessorTest {
List<SendMessageHook> sendMessageHookList = new ArrayList<>();
final SendMessageContext[] sendMessageContext = new SendMessageContext[1];
SendMessageHook sendMessageHook = new SendMessageHook() {
@Override public String hookName() {
@Override
public String hookName() {
return null;
}
@Override public void sendMessageBefore(SendMessageContext context) {
@Override
public void sendMessageBefore(SendMessageContext context) {
sendMessageContext[0] = context;
}
@Override public void sendMessageAfter(SendMessageContext context) {
@Override
public void sendMessageAfter(SendMessageContext context) {
}
};
......@@ -115,7 +118,6 @@ public class SendMessageProcessorTest {
assertThat(sendMessageContext[0].getProducerGroup()).isEqualTo(group);
}
@Test
public void testProcessRequest_FlushTimeOut() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
......@@ -210,7 +212,8 @@ public class SendMessageProcessorTest {
final RemotingCommand request = createSendMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1];
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
response[0] = invocation.getArgument(0);
return null;
}
......
......@@ -32,7 +32,6 @@ public interface MQAdmin {
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @throws MQClientException
*/
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
......@@ -44,7 +43,6 @@ public interface MQAdmin {
* @param newTopic topic name
* @param queueNum topic's queue number
* @param topicSysFlag topic system flag
* @throws MQClientException
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
......@@ -56,7 +54,6 @@ public interface MQAdmin {
* @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return offset
* @throws MQClientException
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
......@@ -65,7 +62,6 @@ public interface MQAdmin {
*
* @param mq Instance of MessageQueue
* @return the max offset
* @throws MQClientException
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
......@@ -74,7 +70,6 @@ public interface MQAdmin {
*
* @param mq Instance of MessageQueue
* @return the minimum offset
* @throws MQClientException
*/
long minOffset(final MessageQueue mq) throws MQClientException;
......@@ -83,7 +78,6 @@ public interface MQAdmin {
*
* @param mq Instance of MessageQueue
* @return the time in microseconds
* @throws MQClientException
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
......@@ -92,10 +86,6 @@ public interface MQAdmin {
*
* @param offsetMsgId message id
* @return message
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
* @throws MQClientException
*/
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
......@@ -109,21 +99,14 @@ public interface MQAdmin {
* @param begin from when
* @param end to when
* @return Instance of QueryResult
* @throws MQClientException
* @throws InterruptedException
*/
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
/**
* @param topic
* @param msgId
* @return The {@code MessageExt} of given msgId
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
* @throws MQClientException
*/
MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
\ No newline at end of file
......@@ -41,7 +41,6 @@ public class MQHelper {
* @param consumerGroup consumer group
* @param topic topic
* @param timestamp time
* @throws Exception
*/
public static void resetOffsetByTimestamp(
final MessageModel messageModel,
......
......@@ -35,8 +35,6 @@ public class Validators {
public static final int CHARACTER_MAX_LENGTH = 255;
/**
* @param origin
* @param patternStr
* @return The resulting {@code String}
*/
public static String getGroupWithRegularExpression(String origin, String patternStr) {
......@@ -50,9 +48,6 @@ public class Validators {
/**
* Validate group
*
* @param group
* @throws MQClientException
*/
public static void checkGroup(String group) throws MQClientException {
if (UtilAll.isBlank(group)) {
......@@ -69,8 +64,6 @@ public class Validators {
}
/**
* @param origin
* @param pattern
* @return <tt>true</tt> if, and only if, the entire origin sequence matches this matcher's pattern
*/
public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
......@@ -83,10 +76,6 @@ public class Validators {
/**
* Validate message
*
* @param msg
* @param defaultMQProducer
* @throws MQClientException
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
......@@ -113,9 +102,6 @@ public class Validators {
/**
* Validate topic
*
* @param topic
* @throws MQClientException
*/
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
......
......@@ -264,7 +264,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
}
......@@ -276,7 +277,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
@Override
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
}
......@@ -297,7 +299,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
@Override
public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(String topic,
String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(uniqKey);
return this.viewMessage(uniqKey);
......
......@@ -51,7 +51,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* </p>
*
* <p>
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
......@@ -90,29 +90,29 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* There are three consuming points:
* <ul>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two
* cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet
* expired, which means the consumer group represents a lately launched business, consuming will
* start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest
* messages, meaning messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
* <li>
* <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
* </li>
* <li>
* <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
* messages born prior to {@link #consumeTimestamp} will be ignored
* </li>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two
* cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet
* expired, which means the consumer group represents a lately launched business, consuming will
* start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest
* messages, meaning messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
* <li>
* <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
* </li>
* <li>
* <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
* messages born prior to {@link #consumeTimestamp} will be ignored
* </li>
* </ul>
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
......@@ -223,11 +223,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
......@@ -235,6 +237,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(RPCHook rpcHook) {
......@@ -243,6 +246,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
......@@ -280,7 +284,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
}
......@@ -291,7 +296,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
@Override
public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
......@@ -411,6 +417,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Send message back to broker which will be re-delivered in future.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
......@@ -449,6 +456,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
*
* @throws MQClientException if there is any client error.
*/
@Override
......@@ -498,7 +506,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
......@@ -508,10 +516,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to consume.
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
* @throws MQClientException
*/
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
......@@ -521,12 +529,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Subscribe a topic by message selector.
*
* @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
* @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
*
* @param topic topic to consume.
* @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
* @throws MQClientException
* @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
* @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
*/
@Override
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
......@@ -535,6 +541,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Un-subscribe the specified topic from subscription.
*
* @param topic message topic
*/
@Override
......
......@@ -30,13 +30,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the brokers,and delay consuming some time
*
* @param msg
* @param delayLevel
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
* @throws MQClientException
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
......@@ -44,14 +37,6 @@ public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
*
* @param msg
* @param delayLevel
* @param brokerName
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
* @throws MQClientException
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
......@@ -61,7 +46,6 @@ public interface MQConsumer extends MQAdmin {
*
* @param topic message topic
* @return queue set
* @throws MQClientException
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}
......@@ -29,8 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public interface MQPullConsumer extends MQConsumer {
/**
* Start the consumer
*
* @throws MQClientException
*/
void start() throws MQClientException;
......@@ -41,9 +39,6 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Register the message queue listener
*
* @param topic
* @param listener
*/
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
......@@ -51,15 +46,12 @@ public interface MQPullConsumer extends MQConsumer {
* Pulling the messages,not blocking
*
* @param mq from which message queue
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
* @throws MQClientException
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
*/
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
......@@ -68,16 +60,7 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages in the specified timeout
*
* @param mq
* @param subExpression
* @param offset
* @param maxNums
* @param timeout
* @return The resulting {@code PullRequest}
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums, final long timeout) throws MQClientException, RemotingException,
......@@ -85,15 +68,6 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages in a async. way
*
* @param mq
* @param subExpression
* @param offset
* @param maxNums
* @param pullCallback
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
......@@ -101,16 +75,6 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages in a async. way
*
* @param mq
* @param subExpression
* @param offset
* @param maxNums
* @param pullCallback
* @param timeout
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
......@@ -119,15 +83,7 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages,if no message arrival,blocking some time
*
* @param mq
* @param subExpression
* @param offset
* @param maxNums
* @return The resulting {@code PullRequest}
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
final long offset, final int maxNums) throws MQClientException, RemotingException,
......@@ -135,15 +91,6 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages through callback function,if no message arrival,blocking.
*
* @param mq
* @param subExpression
* @param offset
* @param maxNums
* @param pullCallback
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
......@@ -151,20 +98,13 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Update the offset
*
* @param mq
* @param offset
* @throws MQClientException
*/
void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
/**
* Fetch the offset
*
* @param mq
* @param fromStore
* @return The fetched offset of given queue
* @throws MQClientException
*/
long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
......@@ -173,22 +113,12 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param topic message topic
* @return message queue set
* @throws MQClientException
*/
Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException;
/**
* If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br>
* Mind! message can only be consumed in the same group.
*
* @param msg
* @param delayLevel
* @param brokerName
* @param consumerGroup
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
* @throws MQClientException
*/
void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
......
......@@ -27,8 +27,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
public interface MQPushConsumer extends MQConsumer {
/**
* Start the consumer
*
* @throws MQClientException
*/
void start() throws MQClientException;
......@@ -39,8 +37,6 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Register the message listener
*
* @param messageListener
*/
@Deprecated
void registerMessageListener(MessageListener messageListener);
......@@ -52,22 +48,20 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Subscribe some topic
*
* @param topic
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* @throws MQClientException
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* Subscribe some topic
*
* @param topic
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
* @throws MQClientException
*/
void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
void subscribe(final String topic, final String fullClassName,
final String filterClassSource) throws MQClientException;
/**
* Subscribe some topic with selector.
......@@ -84,9 +78,7 @@ public interface MQPushConsumer extends MQConsumer {
* Choose SQL92: {@link MessageSelector#bySql(java.lang.String)}
* </p>
*
* @param topic
* @param selector message selector({@link MessageSelector}), can be null.
* @throws MQClientException
*/
void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
......@@ -99,8 +91,6 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Update the consumer thread pool size Dynamically
*
* @param corePoolSize
*/
void updateCorePoolSize(int corePoolSize);
......
......@@ -20,14 +20,13 @@ package org.apache.rocketmq.client.consumer;
import org.apache.rocketmq.common.filter.ExpressionType;
/**
*
* Message selector: select message at server.
* <p>
* Now, support:
* <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG}
* </li>
* <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </li>
* Now, support:
* <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG}
* </li>
* <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </li>
* </p>
*/
public class MessageSelector {
......@@ -51,7 +50,6 @@ public class MessageSelector {
* Use SLQ92 to select message.
*
* @param sql if null or empty, will be treated as select all message.
* @return
*/
public static MessageSelector bySql(String sql) {
return new MessageSelector(ExpressionType.SQL92, sql);
......@@ -61,7 +59,6 @@ public class MessageSelector {
* Use tag to select message.
*
* @param tag if null or empty or "*", will be treated as select all message.
* @return
*/
public static MessageSelector byTag(String tag) {
return new MessageSelector(ExpressionType.TAG, tag);
......
......@@ -24,10 +24,10 @@ import org.apache.rocketmq.common.message.MessageExt;
*/
public interface MessageListenerConcurrently extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
* consumption failure
*
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
* @return The consume status
*/
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
......
......@@ -20,14 +20,15 @@ import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one
* thread
*/
public interface MessageListenerOrderly extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure
* It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
* if consumption failure
*
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
* @return The consume status
*/
ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
......
......@@ -30,7 +30,7 @@ import org.slf4j.Logger;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
private final int virtualNodeCnt;
......@@ -41,7 +41,7 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
this(virtualNodeCnt,null);
this(virtualNodeCnt, null);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
......@@ -75,7 +75,6 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
......@@ -105,7 +104,6 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
return "CONSISTENT_HASH";
}
private static class ClientNode implements Node {
private final String clientID;
......@@ -119,6 +117,4 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
}
}
}
......@@ -29,52 +29,37 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public interface OffsetStore {
/**
* Load
*
* @throws MQClientException
*/
void load() throws MQClientException;
/**
* Update the offset,store it in memory
*
* @param mq
* @param offset
* @param increaseOnly
*/
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
/**
* Get offset from local storage
*
* @param mq
* @param type
* @return The fetched offset
*/
long readOffset(final MessageQueue mq, final ReadOffsetType type);
/**
* Persist all offsets,may be in local storage or remote name server
*
* @param mqs
*/
void persistAll(final Set<MessageQueue> mqs);
/**
* Persist the offset,may be in local storage or remote name server
*
* @param mq
*/
void persist(final MessageQueue mq);
/**
* Remove offset
*
* @param mq
*/
void removeOffset(MessageQueue mq);
/**
* @param topic
* @return The cloned offset table of given topic
*/
Map<MessageQueue, Long> cloneOffsetTable(String topic);
......
......@@ -55,7 +55,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
......@@ -82,7 +83,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return false;
}
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
......@@ -107,7 +109,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =
(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
......@@ -121,12 +124,13 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return null;
}
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp());
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp());
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
......@@ -137,7 +141,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
}
@Deprecated
public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerStatusRequestHeader requestHeader =
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
......@@ -150,7 +155,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
......@@ -173,7 +179,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
(ConsumeMessageDirectlyResultRequestHeader) request
......
......@@ -240,7 +240,8 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MessageId messageId = null;
try {
......@@ -252,12 +253,14 @@ public class MQAdminImpl {
messageId.getOffset(), timeoutMillis);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
public MessageExt queryMessageByUniqKey(String topic,
String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
......@@ -268,7 +271,8 @@ public class MQAdminImpl {
}
}
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
if (null == topicRouteData) {
......
......@@ -62,7 +62,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
......@@ -204,7 +205,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) {
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
final long delayMills) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
......
......@@ -55,7 +55,8 @@ public abstract class RebalanceImpl {
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
public RebalanceImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory) {
this.consumerGroup = consumerGroup;
this.messageModel = messageModel;
......@@ -327,7 +328,8 @@ public abstract class RebalanceImpl {
}
}
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
......@@ -400,7 +402,8 @@ public abstract class RebalanceImpl {
return changed;
}
public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided);
public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
......
......@@ -32,7 +32,8 @@ public class RebalancePullImpl extends RebalanceImpl {
this(null, null, null, null, defaultMQPullConsumerImpl);
}
public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
public RebalancePullImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
......
......@@ -40,7 +40,8 @@ public class RebalancePushImpl extends RebalanceImpl {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
......
......@@ -584,7 +584,8 @@ public class MQClientInstance {
}
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
......@@ -719,7 +720,8 @@ public class MQClientInstance {
return false;
}
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic,
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
final String topic,
final String filterClassSource) throws UnsupportedEncodingException {
byte[] classBody = null;
int classCRC = 0;
......
......@@ -248,7 +248,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
@Override
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
......@@ -386,7 +387,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
}
public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
......@@ -407,7 +409,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
......@@ -863,7 +866,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* KERNEL ONEWAY -------------------------------------------------------
*/
public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -945,7 +949,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
......@@ -1013,7 +1018,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* DEFAULT SYNC -------------------------------------------------------
*/
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
......@@ -1054,7 +1060,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
......
......@@ -36,7 +36,7 @@ public class ClientLogger {
private static Logger createLogger(final String loggerName) {
String logConfigFilePath =
System.getProperty("rocketmq.client.log.configFile",
System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
Boolean isloadconfig =
Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
......
......@@ -48,8 +48,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* </p>
*
* <p>
* <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context.
* <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context.
* </p>
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
......@@ -137,6 +137,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
......@@ -145,6 +146,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying the RPC hook.
*
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(RPCHook rpcHook) {
......@@ -178,6 +180,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Fetch message queues of topic <code>topic</code>, to which we may send/publish messages.
*
* @param topic Topic to fetch.
* @return List of message queues readily to send messages to
* @throws MQClientException if there is any client error.
......@@ -204,12 +207,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
/**
* Same to {@link #send(Message)} with send timeout specified in addition.
*
* @param msg Message to send.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
......@@ -220,7 +225,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
......@@ -234,6 +240,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to
* {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication
* and application developers are the one to resolve this potential issue.
*
* @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
* @throws MQClientException if there is any client error.
......@@ -241,12 +248,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
/**
* Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
*
* @param msg message to send.
* @param sendCallback Callback to execute.
* @param timeout send timeout.
......@@ -263,6 +272,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
*
* @param msg Message to send.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
......@@ -275,6 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message)} with target message queue specified in addition.
*
* @param msg Message to send.
* @param mq Target message queue.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
......@@ -327,6 +338,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified.
*
* @param msg Message to send.
* @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
......@@ -343,6 +355,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #sendOneway(Message)} with target message queue specified.
*
* @param msg Message to send.
* @param mq Target message queue.
* @throws MQClientException if there is any client error.
......@@ -350,12 +363,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
}
/**
* Same to {@link #send(Message)} with message queue selector specified.
* Same to {@link #send(Message)} with message queue selector specified.
*
* @param msg Message to send.
* @param selector Message queue selector, through which we get target message queue to deliver message to.
......@@ -430,6 +444,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
* @param msg Message to send.
* @param selector Message queue selector, through which to determine target message queue to deliver message
* @param arg Argument used along with message queue selector.
......@@ -453,13 +468,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* Create a topic on broker.
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
......@@ -472,6 +489,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Create a topic on broker.
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
......@@ -485,6 +503,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Search consume queue offset of the given time stamp.
*
* @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return Consume queue offset.
......@@ -509,6 +528,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query minimum offset of the given message queue.
*
* @param mq Instance of MessageQueue
* @return minimum offset of the given message queue.
* @throws MQClientException if there is any client error.
......@@ -520,6 +540,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query earliest message store time.
*
* @param mq Instance of MessageQueue
* @return earliest message store time.
* @throws MQClientException if there is any client error.
......@@ -531,6 +552,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given offset message ID.
*
* @param offsetMsgId message id
* @return Message specified.
* @throws MQBrokerException if there is any broker error.
......@@ -539,12 +561,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
}
/**
* Query message by key.
*
* @param topic message topic
* @param key message key index word
* @param maxNum max message number
......@@ -572,7 +596,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
......@@ -582,22 +607,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
@Override
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
@Override
public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(Collection<Message> msgs,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(Collection<Message> msgs,
MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
}
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
......@@ -615,6 +644,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
return msgBatch;
}
public String getProducerGroup() {
return producerGroup;
}
......
......@@ -32,7 +32,8 @@ public class SendResult {
public SendResult() {
}
public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue,
long queueOffset) {
this.sendStatus = sendStatus;
this.msgId = msgId;
this.offsetMsgId = offsetMsgId;
......@@ -40,7 +41,8 @@ public class SendResult {
this.queueOffset = queueOffset;
}
public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId,
public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue,
final long queueOffset, final String transactionId,
final String offsetMsgId, final String regionId) {
this.sendStatus = sendStatus;
this.msgId = msgId;
......
......@@ -86,7 +86,8 @@ public class DefaultMQPullConsumerTest {
@Test
public void testPullMessage_Success() throws Exception {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
}
......@@ -103,9 +104,10 @@ public class DefaultMQPullConsumerTest {
}
@Test
public void testPullMessage_NotFound() throws Exception{
public void testPullMessage_NotFound() throws Exception {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
}
......@@ -119,7 +121,8 @@ public class DefaultMQPullConsumerTest {
@Test
public void testPullMessageAsync_Success() throws Exception {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
......@@ -131,7 +134,8 @@ public class DefaultMQPullConsumerTest {
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
@Override public void onSuccess(PullResult pullResult) {
@Override
public void onSuccess(PullResult pullResult) {
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
......@@ -140,13 +144,15 @@ public class DefaultMQPullConsumerTest {
assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<Object>());
}
@Override public void onException(Throwable e) {
@Override
public void onException(Throwable e) {
}
});
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception {
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
}
}
\ No newline at end of file
......@@ -90,7 +90,8 @@ public class DefaultMQPushConsumerTest {
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
......@@ -109,7 +110,6 @@ public class DefaultMQPushConsumerTest {
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
......@@ -125,21 +125,22 @@ public class DefaultMQPushConsumerTest {
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
......@@ -159,7 +160,8 @@ public class DefaultMQPushConsumerTest {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
......@@ -217,7 +219,8 @@ public class DefaultMQPushConsumerTest {
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception {
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
......
......@@ -29,7 +29,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AllocateMessageQueueConsitentHashTest {
private String topic;
......@@ -40,8 +39,6 @@ public class AllocateMessageQueueConsitentHashTest {
topic = "topic_test";
}
public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
if (messageQueueList == null || messageQueueList.size() < 1)
return;
......@@ -85,28 +82,27 @@ public class AllocateMessageQueueConsitentHashTest {
@Test
public void testAllocate1() {
testAllocate(20,10);
testAllocate(20, 10);
}
@Test
public void testAllocate2() {
testAllocate(10,20);
testAllocate(10, 20);
}
@Test
public void testRun100RandomCase(){
for(int i=0;i<100;i++){
int consumerSize = new Random().nextInt(200)+1;//1-200
int queueSize = new Random().nextInt(100)+1;//1-100
testAllocate(queueSize,consumerSize);
public void testRun100RandomCase() {
for (int i = 0; i < 100; i++) {
int consumerSize = new Random().nextInt(200) + 1;//1-200
int queueSize = new Random().nextInt(100) + 1;//1-100
testAllocate(queueSize, consumerSize);
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
} catch (InterruptedException e) {
}
}
}
public void testAllocate(int queueSize, int consumerSize) {
AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
......@@ -133,7 +129,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
Assert.assertTrue(
verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
verifyAllocateAll(cidBegin, mqAll, allocatedResAll));
}
Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<MessageQueue, String>();
......@@ -162,7 +158,7 @@ public class AllocateMessageQueueConsitentHashTest {
//System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
}
Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
Assert.assertTrue("queueSize" + queueSize + "consumerSize:" + consumerSize + "\nmqAll:" + mqAll + "\nallocatedResAllAfterRemove" + allocatedResAllAfterRemove,
verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
}
......@@ -170,7 +166,7 @@ public class AllocateMessageQueueConsitentHashTest {
List<String> cidAfterAdd = new ArrayList<String>(cidAfterRemoveOne);
//test allocate add one more cid
{
String newCid = CID_PREFIX+"NEW";
String newCid = CID_PREFIX + "NEW";
//System.out.println("add one more cid "+newCid);
cidAfterAdd.add(newCid);
List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
......@@ -182,7 +178,7 @@ public class AllocateMessageQueueConsitentHashTest {
allocatedResAllAfterAdd.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAll3.put(mq, cid);
if (cid.equals(newCid)){
if (cid.equals(newCid)) {
mqShouldOnlyChanged.add(mq);
}
}
......@@ -190,19 +186,21 @@ public class AllocateMessageQueueConsitentHashTest {
}
Assert.assertTrue(
verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
verifyAllocateAll(cidAfterAdd, mqAll, allocatedResAllAfterAdd));
verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
}
}
private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
if (cidAll.isEmpty()){
private boolean verifyAllocateAll(List<String> cidAll, List<MessageQueue> mqAll,
List<MessageQueue> allocatedResAll) {
if (cidAll.isEmpty()) {
return allocatedResAll.isEmpty();
}
return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
}
private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter,
String removeCID) {
for (MessageQueue mq : allocateToBefore.keySet()) {
String allocateToOrigin = allocateToBefore.get(mq);
if (allocateToOrigin.equals(removeCID)) {
......@@ -213,14 +211,15 @@ public class AllocateMessageQueueConsitentHashTest {
}
}
private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter,
String newCID) {
for (MessageQueue mq : allocateAfter.keySet()) {
String allocateToOrigin = allocateBefore.get(mq);
String allocateToAfter = allocateAfter.get(mq);
if (allocateToAfter.equals(newCID)) {
} else {//the rest queue should be the same
Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
Assert.assertTrue("it was allocated to " + allocateToOrigin + ". Now, it is to " + allocateAfter.get(mq) + " mq:" + mq, allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
}
}
}
......
......@@ -99,7 +99,8 @@ public class RemoteBrokerOffsetStoreTest {
final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 3);
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
UpdateConsumerOffsetRequestHeader updateRequestHeader = mock.getArgument(1);
when(mqClientAPI.queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong())).thenReturn(updateRequestHeader.getCommitOffset());
return null;
......@@ -123,8 +124,6 @@ public class RemoteBrokerOffsetStoreTest {
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
}
@Test
public void testRemoveOffset() throws Exception {
OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
......
......@@ -107,7 +107,8 @@ public class MQClientAPIImplTest {
@Test
public void testSendMessageSync_Success() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createSuccessResponse(request);
}
......@@ -127,7 +128,8 @@ public class MQClientAPIImplTest {
@Test
public void testSendMessageSync_WithException() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SYSTEM_ERROR);
......@@ -156,7 +158,8 @@ public class MQClientAPIImplTest {
assertThat(sendResult).isNull();
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
......@@ -169,14 +172,16 @@ public class MQClientAPIImplTest {
sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
}
@Override public void onException(Throwable e) {
@Override
public void onException(Throwable e) {
}
},
null, null, 0, sendMessageContext, defaultMQProducerImpl);
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
......
......@@ -149,13 +149,16 @@ public class DefaultMQProducerTest {
final Throwable[] assertionErrors = new Throwable[1];
final CountDownLatch countDownLatch = new CountDownLatch(2);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@Override public String hookName() {
@Override
public String hookName() {
return "TestHook";
}
@Override public void sendMessageBefore(final SendMessageContext context) {
@Override
public void sendMessageBefore(final SendMessageContext context) {
assertionErrors[0] = assertInOtherThread(new Runnable() {
@Override public void run() {
@Override
public void run() {
assertThat(context.getMessage()).isEqualTo(message);
assertThat(context.getProducer()).isEqualTo(producer);
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
......@@ -165,9 +168,11 @@ public class DefaultMQProducerTest {
countDownLatch.countDown();
}
@Override public void sendMessageAfter(final SendMessageContext context) {
@Override
public void sendMessageAfter(final SendMessageContext context) {
assertionErrors[0] = assertInOtherThread(new Runnable() {
@Override public void run() {
@Override
public void run() {
assertThat(context.getMessage()).isEqualTo(message);
assertThat(context.getProducer()).isEqualTo(producer.getDefaultMQProducerImpl());
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
......@@ -229,7 +234,8 @@ public class DefaultMQProducerTest {
private Throwable assertInOtherThread(final Runnable runnable) {
final Throwable[] assertionErrors = new Throwable[1];
Thread thread = new Thread(new Runnable() {
@Override public void run() {
@Override
public void run() {
try {
runnable.run();
} catch (AssertionError e) {
......
......@@ -54,7 +54,8 @@ public class BrokerConfig {
private String messageStorePlugIn = "";
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
......
......@@ -65,7 +65,6 @@ public class Configuration {
/**
* register config object
*
* @param configObject
* @return the current Configuration object
*/
public Configuration registerConfig(Object configObject) {
......@@ -91,7 +90,6 @@ public class Configuration {
/**
* register config properties
*
* @param extProperties
* @return the current Configuration object
*/
public Configuration registerConfig(Properties extProperties) {
......@@ -117,8 +115,6 @@ public class Configuration {
/**
* The store path will be gotten from the field of object.
*
* @param object
* @param fieldName
* @throws java.lang.RuntimeException if the field of object is not exist.
*/
public void setStorePathFromConfig(Object object, String fieldName) {
......
......@@ -29,7 +29,8 @@ public class CountDownLatch2 {
/**
* Constructs a {@code CountDownLatch2} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link #await}
* @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link
* #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch2(int count) {
......@@ -104,7 +105,8 @@ public class CountDownLatch2 {
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count reached zero
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count
* reached zero
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public boolean await(long timeout, TimeUnit unit)
......@@ -176,7 +178,7 @@ public class CountDownLatch2 {
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
for (; ; ) {
int c = getState();
if (c == 0)
return false;
......
......@@ -74,7 +74,7 @@ public class DataVersion extends RemotingSerializable {
int result = (int) (timestamp ^ (timestamp >>> 32));
if (null != counter) {
long l = counter.get();
result = 31 * result + (int)(l ^ (l >>> 32));
result = 31 * result + (int) (l ^ (l >>> 32));
}
return result;
}
......
......@@ -58,8 +58,8 @@ public class MixAll {
public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net";
public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
// // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
// public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String DEFAULT_TOPIC = "TBW102";
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
......@@ -169,7 +169,6 @@ public class MixAll {
file.renameTo(new File(fileName));
}
public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
File file = new File(fileName);
File fileParent = file.getParentFile();
......@@ -250,7 +249,8 @@ public class MixAll {
printObjectProperties(logger, object, false);
}
public static void printObjectProperties(final Logger logger, final Object object, final boolean onlyImportantField) {
public static void printObjectProperties(final Logger logger, final Object object,
final boolean onlyImportantField) {
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
......
......@@ -65,7 +65,7 @@ public abstract class ServiceThread implements Runnable {
}
long eclipseTime = System.currentTimeMillis() - beginTime;
log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ this.getJointime());
+ this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
......
......@@ -209,7 +209,6 @@ public class UtilAll {
return -1;
}
public static int crc32(byte[] array) {
if (array != null) {
return crc32(array, 0, array.length);
......@@ -218,7 +217,6 @@ public class UtilAll {
return 0;
}
public static int crc32(byte[] array, int offset, int length) {
CRC32 crc32 = new CRC32();
crc32.update(array, offset, length);
......
......@@ -25,20 +25,18 @@ import java.util.TreeMap;
/**
* To hash Node objects to a hash ring with a certain amount of virtual node.
* Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
*
* @param <T>
* Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash
* algorithm
*/
public class ConsistentHashRouter<T extends Node> {
private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();
private final HashFunction hashFunction;
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
this(pNodes,vNodeCount, new MD5Hash());
this(pNodes, vNodeCount, new MD5Hash());
}
/**
*
* @param pNodes collections of physical nodes
* @param vNodeCount amounts of virtual nodes
* @param hashFunction hash Function to hash Node instances
......@@ -57,11 +55,13 @@ public class ConsistentHashRouter<T extends Node> {
/**
* add physic node to the hash ring with some virtual nodes
*
* @param pNode physical node needs added to hash ring
* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
*/
public void addNode(T pNode, int vNodeCount) {
if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
if (vNodeCount < 0)
throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
int existingReplicas = getExistingReplicas(pNode);
for (int i = 0; i < vNodeCount; i++) {
VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
......@@ -71,7 +71,6 @@ public class ConsistentHashRouter<T extends Node> {
/**
* remove the physical node from the hash ring
* @param pNode
*/
public void removeNode(T pNode) {
Iterator<Long> it = ring.keySet().iterator();
......@@ -86,20 +85,19 @@ public class ConsistentHashRouter<T extends Node> {
/**
* with a specified key, route the nearest Node instance in the current hash ring
*
* @param objectKey the object key to find a nearest Node
* @return
*/
public T routeNode(String objectKey) {
if (ring.isEmpty()) {
return null;
}
Long hashVal = hashFunction.hash(objectKey);
SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal);
Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
return ring.get(nodeHashVal).getPhysicalNode();
}
public int getExistingReplicas(T pNode) {
int replicas = 0;
for (VirtualNode<T> vNode : ring.values()) {
......@@ -110,7 +108,6 @@ public class ConsistentHashRouter<T extends Node> {
return replicas;
}
//default hash function
private static class MD5Hash implements HashFunction {
MessageDigest instance;
......
......@@ -21,7 +21,6 @@ package org.apache.rocketmq.common.consistenthash;
*/
public interface Node {
/**
*
* @return the key which will be used for hash mapping
*/
String getKey();
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.common.consistenthash;
public class VirtualNode<T extends Node> implements Node {
final T physicalNode;
final int replicaIndex;
......
......@@ -65,7 +65,7 @@ public class FilterAPI {
}
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
final String type) throws Exception {
if (ExpressionType.TAG.equals(type) || type == null) {
return buildSubscriptionData(null, topic, subString);
}
......
......@@ -35,7 +35,6 @@ public class PolishExpr {
* Shunting-yard algorithm <br/>
* http://en.wikipedia.org/wiki/Shunting_yard_algorithm
*
* @param tokens
* @return the compute result of Shunting-yard algorithm
*/
public static List<Op> reversePolish(List<Op> tokens) {
......
......@@ -98,7 +98,6 @@ public class MessageDecoder {
* Just decode properties from msg buffer.
*
* @param byteBuffer msg commit log buffer.
* @return
*/
public static Map<String, String> decodeProperties(java.nio.ByteBuffer byteBuffer) {
int topicLengthPosition = BODY_SIZE_POSITION + 4 + byteBuffer.getInt(BODY_SIZE_POSITION);
......@@ -239,8 +238,6 @@ public class MessageDecoder {
return byteBuffer.array();
}
public static MessageExt decode(
java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
return decode(byteBuffer, readBody, deCompressBody, false);
......@@ -414,7 +411,6 @@ public class MessageDecoder {
return map;
}
public static byte[] encodeMessage(Message message) {
//only need flag, body, properties
byte[] body = message.getBody();
......@@ -488,9 +484,9 @@ public class MessageDecoder {
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message: messages) {
for (Message message : messages) {
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
......@@ -504,7 +500,6 @@ public class MessageDecoder {
return allBytes;
}
public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
//TO DO add a callback for processing, avoid creating lists
List<Message> msgs = new ArrayList<Message>();
......
......@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
......@@ -23,13 +23,11 @@ public class MessageExtBatch extends MessageExt {
private static final long serialVersionUID = -2353110995348498537L;
public ByteBuffer wrap() {
assert getBody() != null;
return ByteBuffer.wrap(getBody(), 0, getBody().length);
return ByteBuffer.wrap(getBody(), 0, getBody().length);
}
private ByteBuffer encodedBuff;
public ByteBuffer getEncodedBuff() {
......
......@@ -162,7 +162,6 @@ public class RequestCode {
*/
public static final int GET_NAMESRV_CONFIG = 319;
public static final int SEND_BATCH_MESSAGE = 320;
public static final int QUERY_CONSUME_QUEUE = 321;
......
......@@ -35,7 +35,6 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
public String getTopic() {
......
......@@ -54,7 +54,6 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNullable
private boolean m; //batch
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
......
......@@ -58,6 +58,5 @@ public class UnregisterClientRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
}
......@@ -25,7 +25,6 @@ public class UnregisterClientResponseHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
}
......@@ -22,7 +22,6 @@ package org.apache.rocketmq.common.protocol.heartbeat;
/**
* Message model
*
*/
public enum MessageModel {
/**
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.common.sysflag;
public class TopicSysFlag {
private final static int FLAG_UNIT = 0x1 << 0;
......
......@@ -90,13 +90,7 @@ public class HttpTinyClient {
}
/**
* @param url
* @param headers
* @param paramValues
* @param encoding
* @param readTimeoutMs
* @return the http response of given http post request
* @throws java.io.IOException
*/
static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
String encoding, long readTimeoutMs) throws IOException {
......
......@@ -60,7 +60,7 @@ public class IOTinyUtils {
BufferedReader reader = toBufferedReader(input);
List<String> list = new ArrayList<String>();
String line;
for (;;) {
for (; ; ) {
line = reader.readLine();
if (null != line) {
list.add(line);
......@@ -114,7 +114,6 @@ public class IOTinyUtils {
fileOrDir.delete();
}
public static void cleanDirectory(File directory) throws IOException {
if (!directory.exists()) {
String message = directory + " does not exist";
......
......@@ -18,6 +18,7 @@
package org.apache.rocketmq.common;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class MQVersionTest {
......@@ -39,7 +40,6 @@ public class MQVersionTest {
assertThat(MQVersion.value2Version(0)).isEqualTo(MQVersion.Version.V3_0_0_SNAPSHOT);
}
@Test
public void testValue2Version_HigherVersion() throws Exception {
assertThat(MQVersion.value2Version(Integer.MAX_VALUE)).isEqualTo(MQVersion.Version.HIGHER_VERSION);
......
......@@ -25,7 +25,6 @@ import org.junit.Test;
public class MessageBatchTest {
public List<Message> generateMessages() {
List<Message> messages = new ArrayList<Message>();
Message message1 = new Message("topic1", "body".getBytes());
......@@ -37,32 +36,34 @@ public class MessageBatchTest {
}
@Test
public void testGenerate_OK() throws Exception{
public void testGenerate_OK() throws Exception {
List<Message> messages = generateMessages();
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffTopic() throws Exception{
public void testGenerate_DiffTopic() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setTopic("topic2");
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffWaitOK() throws Exception{
public void testGenerate_DiffWaitOK() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setWaitStoreMsgOK(false);
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Delay() throws Exception{
public void testGenerate_Delay() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setDelayTimeLevel(1);
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Retry() throws Exception{
public void testGenerate_Retry() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
MessageBatch.generateFromList(messages);
......
......@@ -35,7 +35,7 @@ public class FilterAPITest {
FilterAPI.buildSubscriptionData(group, topic, subString);
assertThat(subscriptionData.getTopic()).isEqualTo(topic);
assertThat(subscriptionData.getSubString()).isEqualTo(subString);
String [] tags = subString.split("\\|\\|");
String[] tags = subString.split("\\|\\|");
Set<String> tagSet = new HashSet<String>();
for (String tag : tags) {
tagSet.add(tag.trim());
......
......@@ -16,28 +16,28 @@
-->
<java>
<debug>false</debug>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.tools.command.MQAdminStartup</mainclass>
<mainclass>org.apache.rocketmq.tools.command.MQAdminStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<classpaths>
</classpaths>
<options>
<-Xms512m></-Xms512m>
<-Xmx1g></-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
<options>
<-Xms512m></-Xms512m>
<-Xmx1g></-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
......@@ -16,28 +16,30 @@
-->
<java>
<debug>false</debug>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.broker.BrokerStartup</mainclass>
<mainclass>org.apache.rocketmq.broker.BrokerStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<classpaths>
</classpaths>
<options>
<-Xms512m></-Xms512m>
<-Xmx1g></-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
<options>
<-Xms512m>
</-Xms512m>
<-Xmx1g>
</-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
......@@ -16,28 +16,30 @@
-->
<java>
<debug>false</debug>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.filtersrv.FiltersrvStartup</mainclass>
<mainclass>org.apache.rocketmq.filtersrv.FiltersrvStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<classpaths>
</classpaths>
<options>
<-Xms512m></-Xms512m>
<-Xmx1g></-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
<options>
<-Xms512m>
</-Xms512m>
<-Xmx1g>
</-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
......@@ -16,28 +16,30 @@
-->
<java>
<debug>false</debug>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.namesrv.NamesrvStartup</mainclass>
<mainclass>org.apache.rocketmq.namesrv.NamesrvStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<classpaths>
</classpaths>
<options>
<-Xms512m></-Xms512m>
<-Xmx1g></-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
<options>
<-Xms512m>
</-Xms512m>
<-Xmx1g>
</-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
......
......@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
......
......@@ -233,7 +233,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
......@@ -350,8 +350,8 @@
</logger>
<logger name="RocketmqConsole" additivity="false">
<level value="INFO" />
<appender-ref ref="STDOUT" />
<level value="INFO"/>
<appender-ref ref="STDOUT"/>
</logger>
<root>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册