提交 853b167b 编写于 作者: Y yukon

ROCKETMQ-18 Fix the conflicts between code and check style.

上级 388ba7a5
......@@ -205,7 +205,7 @@ public class BrokerController {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore)this.messageStore);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
......@@ -219,7 +219,7 @@ public class BrokerController {
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig)this.nettyServerConfig.clone();
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
......
......@@ -178,7 +178,7 @@ public class BrokerStartup {
}
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
......
......@@ -68,7 +68,7 @@ public class ClientChannelInfo {
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
result = prime * result + ((language == null) ? 0 : language.hashCode());
result = prime * result + (int)(lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
result = prime * result + version;
return result;
}
......@@ -81,7 +81,7 @@ public class ClientChannelInfo {
return false;
if (getClass() != obj.getClass())
return false;
ClientChannelInfo other = (ClientChannelInfo)obj;
ClientChannelInfo other = (ClientChannelInfo) obj;
if (channel == null) {
if (other.channel != null)
return false;
......
......@@ -39,8 +39,8 @@ public class BrokerFastFailure {
public static RequestTask castRunnable(final Runnable runnable) {
try {
FutureTaskExt object = (FutureTaskExt)runnable;
return (RequestTask)object.getRunnable();
FutureTaskExt object = (FutureTaskExt) runnable;
return (RequestTask) object.getRunnable();
} catch (Throwable e) {
log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
}
......
......@@ -32,7 +32,7 @@ public class ManyPullRequest {
public synchronized List<PullRequest> cloneListAndClear() {
if (!this.pullRequestList.isEmpty()) {
List<PullRequest> result = (ArrayList<PullRequest>)this.pullRequestList.clone();
List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
this.pullRequestList.clear();
return result;
}
......
......@@ -169,7 +169,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
......
......@@ -34,7 +34,7 @@ public final class MessageStoreFactory {
String pluginClass = pluginClasses[i];
try {
@SuppressWarnings("unchecked")
Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass);
Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
messageStore = construct.newInstance(context, messageStore);
} catch (Throwable e) {
......
......@@ -283,12 +283,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2)request
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
(SendMessageRequestHeader)request
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
......@@ -305,7 +305,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
try {
if (response != null) {
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)response.readCustomHeader();
(SendMessageResponseHeader) response.readCustomHeader();
context.setMsgId(responseHeader.getMsgId());
context.setQueueId(responseHeader.getQueueId());
context.setQueueOffset(responseHeader.getQueueOffset());
......
......@@ -202,7 +202,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader)request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
......@@ -237,7 +237,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
(DeleteTopicRequestHeader)request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
......@@ -318,7 +318,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader)response.readCustomHeader();
final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader();
String content = this.brokerController.getConfiguration().getAllConfigsFormatString();
if (content != null && content.length() > 0) {
......@@ -342,9 +342,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader)response.readCustomHeader();
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader)request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
......@@ -358,9 +358,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader)response.readCustomHeader();
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader)request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
(GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
......@@ -373,9 +373,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader)response.readCustomHeader();
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader)request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
......@@ -387,9 +387,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader)response.readCustomHeader();
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader)request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
(GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
long timestamp =
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
......@@ -491,7 +491,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteSubscriptionGroupRequestHeader requestHeader =
(DeleteSubscriptionGroupRequestHeader)request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
......@@ -505,7 +505,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicStatsInfoRequestHeader requestHeader =
(GetTopicStatsInfoRequestHeader)request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
(GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
final String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
......@@ -553,7 +553,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerConnectionListRequestHeader requestHeader =
(GetConsumerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
(GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
......@@ -592,7 +592,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetProducerConnectionListRequestHeader requestHeader =
(GetProducerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
ProducerConnection bodydata = new ProducerConnection();
HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
......@@ -625,7 +625,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumeStatsRequestHeader requestHeader =
(GetConsumeStatsRequestHeader)request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
(GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
......@@ -733,7 +733,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
String content = ((DefaultMessageStore)this.brokerController.getMessageStore()).getScheduleMessageService().encode();
String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
......@@ -759,7 +759,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
......@@ -776,7 +776,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerStatusRequestHeader requestHeader =
(GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
......@@ -788,7 +788,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryTopicConsumeByWhoRequestHeader requestHeader =
(QueryTopicConsumeByWhoRequestHeader)request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
(QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
......@@ -809,9 +809,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader)response.readCustomHeader();
final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
final RegisterFilterServerRequestHeader requestHeader =
(RegisterFilterServerRequestHeader)request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
(RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
......@@ -826,7 +826,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryConsumeTimeSpanRequestHeader requestHeader =
(QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
(QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
final String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
......@@ -916,7 +916,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
*/
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(),
requestHeader.getClientId());
......@@ -925,7 +925,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryCorrectionOffsetHeader requestHeader =
(QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
.queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
......@@ -950,7 +950,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader)request
final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
......@@ -976,7 +976,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader)request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
......@@ -1018,9 +1018,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader)request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) {
......@@ -1068,7 +1068,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
GetConsumeStatsInBrokerHeader requestHeader =
(GetConsumeStatsInBrokerHeader)request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
(GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
boolean isOrder = requestHeader.isOrder();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups =
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
......@@ -1185,7 +1185,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs()));
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false));
......
......@@ -125,7 +125,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
final UnregisterClientRequestHeader requestHeader =
(UnregisterClientRequestHeader)request
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
......
......@@ -72,7 +72,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader)request
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
......@@ -106,7 +106,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader)request
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
......@@ -120,9 +120,9 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader)response.readCustomHeader();
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader)request
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
......
......@@ -49,7 +49,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
......
......@@ -81,9 +81,9 @@ public class PullMessageProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
......@@ -335,7 +335,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int)(this.brokerController.getMessageStore().now() - beginTimeMills));
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
try {
......
......@@ -72,9 +72,9 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
final QueryMessageResponseHeader responseHeader =
(QueryMessageResponseHeader)response.readCustomHeader();
(QueryMessageResponseHeader) response.readCustomHeader();
final QueryMessageRequestHeader requestHeader =
(QueryMessageRequestHeader)request
(QueryMessageRequestHeader) request
.decodeCommandCustomHeader(QueryMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
......@@ -127,7 +127,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
(ViewMessageRequestHeader)request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
......
......@@ -89,7 +89,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
(ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
......@@ -244,7 +244,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
......@@ -415,7 +415,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
......@@ -426,7 +426,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -22,7 +22,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
/**
* Client Common configuration
*
*/
public class ClientConfig {
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
......
......@@ -24,19 +24,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Base interface for MQ management
*
*/
public interface MQAdmin {
/**
* Creates an topic
*
* @param key
* accesskey
* @param newTopic
* topic name
* @param queueNum
* topic's queue number
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @throws MQClientException
*/
void createTopic(final String key, final String newTopic, final int queueNum)
......@@ -45,15 +40,10 @@ public interface MQAdmin {
/**
* Creates an topic
*
* @param key
* accesskey
* @param newTopic
* topic name
* @param queueNum
* topic's queue number
* @param topicSysFlag
* topic system flag
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @param topicSysFlag topic system flag
* @throws MQClientException
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
......@@ -63,13 +53,9 @@ public interface MQAdmin {
* Gets the message queue offset according to some time in milliseconds<br>
* be cautious to call because of more IO overhead
*
* @param mq
* Instance of MessageQueue
* @param timestamp
* from when in milliseconds.
*
* @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return offset
*
* @throws MQClientException
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
......@@ -77,11 +63,8 @@ public interface MQAdmin {
/**
* Gets the max offset
*
* @param mq
* Instance of MessageQueue
*
* @param mq Instance of MessageQueue
* @return the max offset
*
* @throws MQClientException
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
......@@ -89,11 +72,8 @@ public interface MQAdmin {
/**
* Gets the minimum offset
*
* @param mq
* Instance of MessageQueue
*
* @param mq Instance of MessageQueue
* @return the minimum offset
*
* @throws MQClientException
*/
long minOffset(final MessageQueue mq) throws MQClientException;
......@@ -101,11 +81,8 @@ public interface MQAdmin {
/**
* Gets the earliest stored message time
*
* @param mq
* Instance of MessageQueue
*
* @param mq Instance of MessageQueue
* @return the time in microseconds
*
* @throws MQClientException
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
......@@ -113,11 +90,8 @@ public interface MQAdmin {
/**
* Query message according tto message id
*
* @param offsetMsgId
* message id
*
* @param offsetMsgId message id
* @return message
*
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
......@@ -129,19 +103,12 @@ public interface MQAdmin {
/**
* Query messages
*
* @param topic
* message topic
* @param key
* message key index word
* @param maxNum
* max message number
* @param begin
* from when
* @param end
* to when
*
* @param topic message topic
* @param key message key index word
* @param maxNum max message number
* @param begin from when
* @param end to when
* @return Instance of QueryResult
*
* @throws MQClientException
* @throws InterruptedException
*/
......@@ -149,7 +116,6 @@ public interface MQAdmin {
final long end) throws MQClientException, InterruptedException;
/**
* @param topic
* @param msgId
* @return The {@code MessageExt} of given msgId
......
......@@ -28,7 +28,6 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
/**
* Common Validator
*
*/
public class Validators {
public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
......@@ -38,7 +37,6 @@ public class Validators {
/**
* @param origin
* @param patternStr
*
* @return The resulting {@code String}
*/
public static String getGroupWithRegularExpression(String origin, String patternStr) {
......@@ -54,7 +52,6 @@ public class Validators {
* Validate group
*
* @param group
*
* @throws MQClientException
*/
public static void checkGroup(String group) throws MQClientException {
......@@ -74,9 +71,7 @@ public class Validators {
/**
* @param origin
* @param pattern
*
* @return <tt>true</tt> if, and only if, the entire origin sequence
* matches this matcher's pattern
* @return <tt>true</tt> if, and only if, the entire origin sequence matches this matcher's pattern
*/
public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
if (pattern == null) {
......@@ -91,7 +86,6 @@ public class Validators {
*
* @param msg
* @param defaultMQProducer
*
* @throws MQClientException
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
......@@ -120,7 +114,6 @@ public class Validators {
* Validate topic
*
* @param topic
*
* @throws MQClientException
*/
public static void checkTopic(String topic) throws MQClientException {
......
......@@ -35,7 +35,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Default pulling consumer
*
*/
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
......
......@@ -26,7 +26,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Message queue consumer interface
*
*/
public interface MQConsumer extends MQAdmin {
/**
......@@ -34,7 +33,6 @@ public interface MQConsumer extends MQAdmin {
*
* @param msg
* @param delayLevel
*
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
......@@ -50,7 +48,6 @@ public interface MQConsumer extends MQAdmin {
* @param msg
* @param delayLevel
* @param brokerName
*
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
......@@ -62,11 +59,8 @@ public interface MQConsumer extends MQAdmin {
/**
* Fetch message queues from consumer cache according to the topic
*
* @param topic
* message topic
*
* @param topic message topic
* @return queue set
*
* @throws MQClientException
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
......
......@@ -25,7 +25,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Pulling consumer interface
*
*/
public interface MQPullConsumer extends MQConsumer {
/**
......@@ -51,18 +50,12 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Pulling the messages,not blocking
*
* @param mq
* from which message queue
* @param subExpression
* subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* @param offset
* from where to pull
* @param maxNums
* max pulling numbers
*
* @param mq from which message queue
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe
* all
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
*
* @throws MQClientException
* @throws InterruptedException
* @throws MQBrokerException
......@@ -80,9 +73,7 @@ public interface MQPullConsumer extends MQConsumer {
* @param offset
* @param maxNums
* @param timeout
*
* @return The resulting {@code PullRequest}
*
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
......@@ -100,7 +91,6 @@ public interface MQPullConsumer extends MQConsumer {
* @param offset
* @param maxNums
* @param pullCallback
*
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
......@@ -118,7 +108,6 @@ public interface MQPullConsumer extends MQConsumer {
* @param maxNums
* @param pullCallback
* @param timeout
*
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
......@@ -134,9 +123,7 @@ public interface MQPullConsumer extends MQConsumer {
* @param subExpression
* @param offset
* @param maxNums
*
* @return The resulting {@code PullRequest}
*
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
......@@ -154,7 +141,6 @@ public interface MQPullConsumer extends MQConsumer {
* @param offset
* @param maxNums
* @param pullCallback
*
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
......@@ -168,7 +154,6 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param mq
* @param offset
*
* @throws MQClientException
*/
void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
......@@ -178,9 +163,7 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param mq
* @param fromStore
*
* @return The fetched offset of given queue
*
* @throws MQClientException
*/
long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
......@@ -188,11 +171,8 @@ public interface MQPullConsumer extends MQConsumer {
/**
* Fetch the message queues according to the topic
*
* @param topic
* message topic
*
* @param topic message topic
* @return message queue set
*
* @throws MQClientException
*/
Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException;
......@@ -205,7 +185,6 @@ public interface MQPullConsumer extends MQConsumer {
* @param delayLevel
* @param brokerName
* @param consumerGroup
*
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
......
......@@ -23,7 +23,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
/**
* Push consumer
*
*/
public interface MQPushConsumer extends MQConsumer {
/**
......@@ -54,11 +53,8 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic
*
* @param topic
* @param subExpression
* subscription expression.it only support or operation such as
* "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe
* all
* @throws MQClientException
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
......@@ -67,13 +63,8 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic
*
* @param topic
* @param fullClassName
* full class name,must extend
* org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource
* class source code,used UTF-8 file encoding,must be responsible
* for your code safety
*
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
* @throws MQClientException
*/
void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
......@@ -81,8 +72,7 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Unsubscribe consumption some topic
*
* @param topic
* message topic
* @param topic message topic
*/
void unsubscribe(final String topic);
......
......@@ -21,16 +21,12 @@ import org.apache.rocketmq.common.message.MessageQueue;
/**
* A MessageQueueListener is implemented by the application and may be specified when a message queue changed
*
*/
public interface MessageQueueListener {
/**
* @param topic
* message topic
* @param mqAll
* all queues in this message topic
* @param mqDivided
* collection of queues,assigned to the current consumer
* @param topic message topic
* @param mqAll all queues in this message topic
* @param mqDivided collection of queues,assigned to the current consumer
*/
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
......
......@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer;
/**
* Async message pulling interface
*
*/
public interface PullCallback {
void onSuccess(final PullResult pullResult);
......
......@@ -20,7 +20,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
/**
* Consumer Orderly consumption context
*
*/
public class ConsumeOrderlyContext {
private final MessageQueue messageQueue;
......
......@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer.listener;
/**
* A MessageListener object is used to receive asynchronously delivered messages.
*
*/
public interface MessageListener {
}
......@@ -21,17 +21,13 @@ import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
*
*/
public interface MessageListenerConcurrently extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
*
* @return The consume status
*/
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
......
......@@ -21,17 +21,13 @@ import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread
*
*/
public interface MessageListenerOrderly extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
*
* @return The consume status
*/
ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
......
......@@ -23,7 +23,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
/**
* Wrapper class for offset serialization
*
*/
public class OffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
......
......@@ -25,7 +25,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Offset store interface
*
*/
public interface OffsetStore {
/**
......@@ -49,7 +48,6 @@ public interface OffsetStore {
*
* @param mq
* @param type
*
* @return The fetched offset
*/
long readOffset(final MessageQueue mq, final ReadOffsetType type);
......@@ -77,13 +75,11 @@ public interface OffsetStore {
/**
* @param topic
*
* @return The cloned offset table of given topic
*/
Map<MessageQueue, Long> cloneOffsetTable(String topic);
/**
*
* @param mq
* @param offset
* @param isOneway
......
......@@ -84,7 +84,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader)request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
......@@ -110,7 +110,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =
(NotifyConsumerIdsChangedRequestHeader)request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getConsumerGroup());
......@@ -123,7 +123,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
new Object[] {
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
......@@ -141,7 +141,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerStatusRequestHeader requestHeader =
(GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
GetConsumerStatusBody body = new GetConsumerStatusBody();
......@@ -154,7 +154,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (null != consumerRunningInfo) {
......@@ -177,7 +177,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
(ConsumeMessageDirectlyResultRequestHeader)request
(ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
......
......@@ -307,7 +307,7 @@ public class MQAdminImpl {
QueryMessageResponseHeader responseHeader = null;
try {
responseHeader =
(QueryMessageResponseHeader)response
(QueryMessageResponseHeader) response
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
} catch (RemotingCommandException e) {
log.error("decodeCommandCustomHeader exception", e);
......
......@@ -505,7 +505,7 @@ public class MQClientAPIImpl {
}
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
......@@ -622,7 +622,7 @@ public class MQClientAPIImpl {
}
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
......@@ -664,7 +664,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader)response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
(SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
......@@ -687,7 +687,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader)response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
(GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
return responseHeader.getOffset();
}
......@@ -738,7 +738,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader)response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
(GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return responseHeader.getOffset();
}
......@@ -762,7 +762,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader)response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
(GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return responseHeader.getTimestamp();
}
......@@ -786,7 +786,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader)response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
......@@ -1249,7 +1249,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
WipeWritePermOfBrokerResponseHeader responseHeader =
(WipeWritePermOfBrokerResponseHeader)response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
(WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
return responseHeader.getWipeTopicCount();
}
default:
......@@ -1331,7 +1331,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetKVConfigResponseHeader responseHeader =
(GetKVConfigResponseHeader)response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
(GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
return responseHeader.getValue();
}
default:
......
......@@ -581,11 +581,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
......@@ -1024,9 +1024,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long computeAccTotal = this.computeAccumulationTotal();
long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
long incThreshold = (long)(adjustThreadPoolNumsThreshold * 1.0);
long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
long decThreshold = (long)(adjustThreadPoolNumsThreshold * 0.8);
long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
if (computeAccTotal >= incThreshold) {
this.consumeMessageService.incCorePoolSize();
......
......@@ -26,7 +26,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
/**
* Consumer inner interface
*
*/
public interface MQConsumerInner {
String groupName();
......
......@@ -21,7 +21,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
/**
* Message lock,strictly ensure the single queue only one thread at a time consuming
*
*/
public class MessageQueueLock {
private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
......
......@@ -35,7 +35,6 @@ import org.slf4j.Logger;
/**
* Queue consumption snapshot
*
*/
public class ProcessQueue {
public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
......@@ -69,8 +68,6 @@ public class ProcessQueue {
}
/**
*
* @param pushConsumer
*/
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
......
......@@ -66,7 +66,7 @@ public class PullAPIWrapper {
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt)pullResult;
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
......
......@@ -71,7 +71,7 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
......
......@@ -74,7 +74,7 @@ public class PullRequest {
return false;
if (getClass() != obj.getClass())
return false;
PullRequest other = (PullRequest)obj;
PullRequest other = (PullRequest) obj;
if (consumerGroup == null) {
if (other.consumerGroup != null)
return false;
......
......@@ -23,7 +23,6 @@ import org.slf4j.Logger;
/**
* Rebalance Service
*
*/
public class RebalanceService extends ServiceThread {
private static long waitInterval =
......
......@@ -436,7 +436,7 @@ public class MQClientInstance {
if (impl != null) {
try {
if (impl instanceof DefaultMQPushConsumerImpl) {
DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
dmq.adjustThreadPool();
}
} catch (Exception e) {
......@@ -1026,7 +1026,7 @@ public class MQClientInstance {
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl)impl;
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
......@@ -1071,10 +1071,10 @@ public class MQClientInstance {
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
return consumer.getOffsetStore().cloneOffsetTable(topic);
} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
return consumer.getOffsetStore().cloneOffsetTable(topic);
} else {
return Collections.EMPTY_MAP;
......@@ -1118,7 +1118,7 @@ public class MQClientInstance {
final String brokerName) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
return result;
......
......@@ -112,7 +112,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer)this.defaultMQProducer;
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(//
producer.getCheckThreadPoolMinSize(), //
......@@ -238,7 +238,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer)defaultMQProducer;
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionCheckListener();
}
......@@ -538,7 +538,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
......
......@@ -137,8 +137,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public int hashCode() {
int result = getName() != null ? getName().hashCode() : 0;
result = 31 * result + (int)(getCurrentLatency() ^ (getCurrentLatency() >>> 32));
result = 31 * result + (int)(getStartTimestamp() ^ (getStartTimestamp() >>> 32));
result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
return result;
}
......@@ -149,7 +149,7 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
if (!(o instanceof FaultItem))
return false;
final FaultItem faultItem = (FaultItem)o;
final FaultItem faultItem = (FaultItem) o;
if (getCurrentLatency() != faultItem.getCurrentLatency())
return false;
......
......@@ -62,23 +62,23 @@ public class ConsumerStatsManager {
}
public void incPullRT(final String group, final String topic, final long rt) {
this.topicAndGroupPullRT.addValue(topic + "@" + group, (int)rt, 1);
this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
}
public void incPullTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int)msgs, 1);
this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public void incConsumeRT(final String group, final String topic, final long rt) {
this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int)rt, 1);
this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
}
public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int)msgs, 1);
this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int)msgs, 1);
this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public ConsumeStatus consumeStatus(final String group, final String topic) {
......
......@@ -119,7 +119,6 @@ public class Configuration {
*
* @param object
* @param fieldName
*
* @throws java.lang.RuntimeException if the field of object is not exist.
*/
public void setStorePathFromConfig(Object object, String fieldName) {
......@@ -156,7 +155,7 @@ public class Configuration {
if (this.storePathFromConfig) {
try {
realStorePath = (String)storePathField.get(this.storePathObject);
realStorePath = (String) storePathField.get(this.storePathObject);
} catch (IllegalAccessException e) {
log.error("getStorePath error, ", e);
}
......
......@@ -176,7 +176,7 @@ public class CountDownLatch2 {
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
for (;;) {
int c = getState();
if (c == 0)
return false;
......
......@@ -56,7 +56,7 @@ public class DataVersion extends RemotingSerializable {
if (o == null || getClass() != o.getClass())
return false;
final DataVersion that = (DataVersion)o;
final DataVersion that = (DataVersion) o;
if (timestatmp != that.timestatmp)
return false;
......@@ -66,7 +66,7 @@ public class DataVersion extends RemotingSerializable {
@Override
public int hashCode() {
int result = (int)(timestatmp ^ (timestatmp >>> 32));
int result = (int) (timestatmp ^ (timestatmp >>> 32));
result = 31 * result + (counter != null ? counter.hashCode() : 0);
return result;
}
......
......@@ -186,7 +186,7 @@ public class MixAll {
public static final String file2String(final File file) {
if (file.exists()) {
char[] data = new char[(int)file.length()];
char[] data = new char[(int) file.length()];
boolean result = false;
FileReader fileReader = null;
......@@ -442,7 +442,7 @@ public class MixAll {
int unit = si ? 1000 : 1024;
if (bytes < unit)
return bytes + " B";
int exp = (int)(Math.log(bytes) / Math.log(unit));
int exp = (int) (Math.log(bytes) / Math.log(unit));
String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
}
......
......@@ -151,7 +151,7 @@ public class TopicConfig {
if (o == null || getClass() != o.getClass())
return false;
final TopicConfig that = (TopicConfig)o;
final TopicConfig that = (TopicConfig) o;
if (readQueueNums != that.readQueueNums)
return false;
......
......@@ -193,7 +193,7 @@ public class UtilAll {
long freeSpace = file.getFreeSpace();
long usedSpace = totalSpace - freeSpace;
if (totalSpace > 0) {
return usedSpace / (double)totalSpace;
return usedSpace / (double) totalSpace;
}
} catch (Exception e) {
return -1;
......@@ -213,7 +213,7 @@ public class UtilAll {
public static final int crc32(byte[] array, int offset, int length) {
CRC32 crc32 = new CRC32();
crc32.update(array, offset, length);
return (int)(crc32.getValue() & 0x7FFFFFFF);
return (int) (crc32.getValue() & 0x7FFFFFFF);
}
public static String bytes2string(byte[] src) {
......@@ -236,13 +236,13 @@ public class UtilAll {
byte[] d = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
d[i] = (byte)(charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return d;
}
private static byte charToByte(char c) {
return (byte)"0123456789ABCDEF".indexOf(c);
return (byte) "0123456789ABCDEF".indexOf(c);
}
public static byte[] uncompress(final byte[] src) throws IOException {
......@@ -400,15 +400,15 @@ public class UtilAll {
//10.0.0.0~10.255.255.255
//172.16.0.0~172.31.255.255
//192.168.0.0~192.168.255.255
if (ip[0] == (byte)10) {
if (ip[0] == (byte) 10) {
return true;
} else if (ip[0] == (byte)172) {
if (ip[1] >= (byte)16 && ip[1] <= (byte)31) {
} else if (ip[0] == (byte) 172) {
if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) {
return true;
}
} else if (ip[0] == (byte)192) {
if (ip[1] == (byte)168) {
} else if (ip[0] == (byte) 192) {
if (ip[1] == (byte) 168) {
return true;
}
}
......@@ -423,27 +423,27 @@ public class UtilAll {
// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
// }
if (ip[0] >= (byte)1 && ip[0] <= (byte)126) {
if (ip[1] == (byte)1 && ip[2] == (byte)1 && ip[3] == (byte)1) {
if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
return false;
}
if (ip[1] == (byte)0 && ip[2] == (byte)0 && ip[3] == (byte)0) {
if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) {
return false;
}
return true;
} else if (ip[0] >= (byte)128 && ip[0] <= (byte)191) {
if (ip[2] == (byte)1 && ip[3] == (byte)1) {
} else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) {
if (ip[2] == (byte) 1 && ip[3] == (byte) 1) {
return false;
}
if (ip[2] == (byte)0 && ip[3] == (byte)0) {
if (ip[2] == (byte) 0 && ip[3] == (byte) 0) {
return false;
}
return true;
} else if (ip[0] >= (byte)192 && ip[0] <= (byte)223) {
if (ip[3] == (byte)1) {
} else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) {
if (ip[3] == (byte) 1) {
return false;
}
if (ip[3] == (byte)0) {
if (ip[3] == (byte) 0) {
return false;
}
return true;
......@@ -466,10 +466,10 @@ public class UtilAll {
InetAddress ip = null;
byte[] internalIP = null;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface)allNetInterfaces.nextElement();
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
Enumeration addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip = (InetAddress)addresses.nextElement();
ip = (InetAddress) addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
byte[] ipByte = ip.getAddress();
if (ipByte.length == 4) {
......
......@@ -49,7 +49,7 @@ public class PolishExpr {
segments.add(token);
} else if (isLeftParenthesis(token)) {
operatorStack.push((Operator)token);
operatorStack.push((Operator) token);
} else if (isRightParenthesis(token)) {
Operator opNew = null;
......@@ -60,7 +60,7 @@ public class PolishExpr {
throw new IllegalArgumentException("mismatched parentheses");
} else if (isOperator(token)) {
Operator opNew = (Operator)token;
Operator opNew = (Operator) token;
if (!operatorStack.empty()) {
Operator opOld = operatorStack.peek();
if (opOld.isCompareable() && opNew.compare(opOld) != 1) {
......@@ -83,11 +83,8 @@ public class PolishExpr {
}
/**
*
* @param expression
*
* @return
*
* @throws Exception
*/
private static List<Op> participle(String expression) {
......@@ -99,7 +96,7 @@ public class PolishExpr {
Type preType = Type.NULL;
for (int i = 0; i < size; i++) {
int chValue = (int)expression.charAt(i);
int chValue = (int) expression.charAt(i);
if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90)
|| (49 <= chValue && chValue <= 57) || 95 == chValue) {
......@@ -129,7 +126,7 @@ public class PolishExpr {
}
preType = Type.PARENTHESIS;
segments.add(createOperator((char)chValue + ""));
segments.add(createOperator((char) chValue + ""));
} else if (38 == chValue || 124 == chValue) {
if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) {
......@@ -157,7 +154,7 @@ public class PolishExpr {
preType = Type.SEPAERATOR;
} else {
throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char)chValue);
throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue);
}
}
......@@ -173,11 +170,11 @@ public class PolishExpr {
}
public static boolean isLeftParenthesis(Op token) {
return token instanceof Operator && LEFTPARENTHESIS == (Operator)token;
return token instanceof Operator && LEFTPARENTHESIS == (Operator) token;
}
public static boolean isRightParenthesis(Op token) {
return token instanceof Operator && RIGHTPARENTHESIS == (Operator)token;
return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token;
}
public static boolean isOperator(Op token) {
......
......@@ -64,10 +64,10 @@ public class MessageClientIDSetter {
public static Date getNearlyTimeFromID(String msgID) {
ByteBuffer buf = ByteBuffer.allocate(8);
byte[] bytes = UtilAll.string2bytes(msgID);
buf.put((byte)0);
buf.put((byte)0);
buf.put((byte)0);
buf.put((byte)0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put(bytes, 10, 4);
buf.position(0);
long spanMS = buf.getLong();
......@@ -113,8 +113,8 @@ public class MessageClientIDSetter {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int)(System.currentTimeMillis() - startTime));
buffer.putShort((short)COUNTER.getAndIncrement());
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}
......
......@@ -54,7 +54,7 @@ public class MessageDecoder {
public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
InetSocketAddress inetSocketAddress = (InetSocketAddress)socketAddress;
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress());
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.putLong(transactionIdhashCode);
......@@ -95,10 +95,10 @@ public class MessageDecoder {
public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
byte[] body = messageExt.getBody();
byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
byte topicLen = (byte)topics.length;
byte topicLen = (byte) topics.length;
String properties = messageProperties2String(messageExt.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
short propertiesLength = (short)propertiesBytes.length;
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = messageExt.getSysFlag();
byte[] newBody = messageExt.getBody();
if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
......@@ -164,7 +164,7 @@ public class MessageDecoder {
byteBuffer.putLong(bornTimeStamp);
// 10 BORNHOST
InetSocketAddress bornHost = (InetSocketAddress)messageExt.getBornHost();
InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
byteBuffer.put(bornHost.getAddress().getAddress());
byteBuffer.putInt(bornHost.getPort());
......@@ -173,7 +173,7 @@ public class MessageDecoder {
byteBuffer.putLong(storeTimestamp);
// 12 STOREHOST
InetSocketAddress serverHost = (InetSocketAddress)messageExt.getStoreHost();
InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
byteBuffer.put(serverHost.getAddress().getAddress());
byteBuffer.putInt(serverHost.getPort());
......@@ -295,7 +295,7 @@ public class MessageDecoder {
// 16 TOPIC
byte topicLen = byteBuffer.get();
byte[] topic = new byte[(int)topicLen];
byte[] topic = new byte[(int) topicLen];
byteBuffer.get(topic);
msgExt.setTopic(new String(topic, CHARSET_UTF8));
......@@ -314,7 +314,7 @@ public class MessageDecoder {
msgExt.setMsgId(msgId);
if (isClient) {
((MessageClientExt)msgExt).setOffsetMsgId(msgId);
((MessageClientExt) msgExt).setOffsetMsgId(msgId);
}
return msgExt;
......
......@@ -65,7 +65,7 @@ public class MessageExt extends Message {
}
private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)socketAddress;
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.flip();
......@@ -119,7 +119,7 @@ public class MessageExt extends Message {
public String getBornHostString() {
if (this.bornHost != null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)this.bornHost;
InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
return inetSocketAddress.getAddress().getHostAddress();
}
......@@ -128,7 +128,7 @@ public class MessageExt extends Message {
public String getBornHostNameString() {
if (this.bornHost != null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)this.bornHost;
InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
return inetSocketAddress.getAddress().getHostName();
}
......
......@@ -76,7 +76,7 @@ public class MessageQueue implements Comparable<MessageQueue>, Serializable {
return false;
if (getClass() != obj.getClass())
return false;
MessageQueue other = (MessageQueue)obj;
MessageQueue other = (MessageQueue) obj;
if (brokerName == null) {
if (other.brokerName != null)
return false;
......
......@@ -75,7 +75,7 @@ public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializa
return false;
if (getClass() != obj.getClass())
return false;
MessageQueueForC other = (MessageQueueForC)obj;
MessageQueueForC other = (MessageQueueForC) obj;
if (brokerName == null) {
if (other.brokerName != null)
return false;
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z vintagewang@apache.org $
*
* $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -53,7 +53,7 @@ public class ConsumerRunningInfo extends RemotingSerializable {
String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
if (property == null) {
property = ((ConsumeType)prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
}
push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
}
......@@ -109,7 +109,7 @@ public class ConsumerRunningInfo extends RemotingSerializable {
String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
if (property == null) {
property = ((ConsumeType)info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
}
push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
}
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetBrokerConfigResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetBrokerConfigResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetEarliestMsgStoretimeRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetEarliestMsgStoretimeRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetEarliestMsgStoretimeResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetEarliestMsgStoretimeResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetMaxOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetMaxOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetMinOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetMinOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetMinOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetMinOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: PullMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: PullMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: PullMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: PullMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueryConsumerOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueryConsumerOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueryConsumerOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueryConsumerOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetMinOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetMinOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueryMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueryMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueryMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueryMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueryMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueryMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SearchOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: SearchOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SearchOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: SearchOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SendMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: SendMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SendMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: SendMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: UpdateConsumerOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: UpdateConsumerOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: UpdateConsumerOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: UpdateConsumerOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ViewMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: ViewMessageRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ViewMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: ViewMessageResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: MessageModel.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: MessageModel.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ProducerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: ProducerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......@@ -124,7 +126,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
return false;
if (getClass() != obj.getClass())
return false;
SubscriptionData other = (SubscriptionData)obj;
SubscriptionData other = (SubscriptionData) obj;
if (classFilterMode != other.classFilterMode)
return false;
if (codeSet == null) {
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: BrokerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: BrokerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......@@ -78,7 +80,7 @@ public class BrokerData implements Comparable<BrokerData> {
return false;
if (getClass() != obj.getClass())
return false;
BrokerData other = (BrokerData)obj;
BrokerData other = (BrokerData) obj;
if (brokerAddrs == null) {
if (other.brokerAddrs != null)
return false;
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: QueueData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: QueueData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......@@ -81,7 +83,7 @@ public class QueueData implements Comparable<QueueData> {
return false;
if (getClass() != obj.getClass())
return false;
QueueData other = (QueueData)obj;
QueueData other = (QueueData) obj;
if (brokerName == null) {
if (other.brokerName != null)
return false;
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*
* $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
/**
......@@ -106,7 +108,7 @@ public class TopicRouteData extends RemotingSerializable {
return false;
if (getClass() != obj.getClass())
return false;
TopicRouteData other = (TopicRouteData)obj;
TopicRouteData other = (TopicRouteData) obj;
if (brokerDatas == null) {
if (other.brokerDatas != null)
return false;
......
......@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
/**
* thread safe
*
*/
public class ConcurrentTreeMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......
......@@ -22,7 +22,6 @@ import java.util.Queue;
/**
* not thread safe
*
*/
public class RoundQueue<E> {
......
......@@ -114,7 +114,7 @@ public class SubscriptionGroupConfig {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int)(brokerId ^ (brokerId >>> 32));
result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
result = prime * result + (consumeEnable ? 1231 : 1237);
result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
......@@ -123,7 +123,7 @@ public class SubscriptionGroupConfig {
result = prime * result + retryMaxTimes;
result = prime * result + retryQueueNums;
result =
prime * result + (int)(whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
return result;
}
......@@ -135,7 +135,7 @@ public class SubscriptionGroupConfig {
return false;
if (getClass() != obj.getClass())
return false;
SubscriptionGroupConfig other = (SubscriptionGroupConfig)obj;
SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
if (brokerId != other.brokerId)
return false;
if (consumeBroadcastEnable != other.consumeBroadcastEnable)
......
......@@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
public class ChannelUtil {
public static String getRemoteIp(Channel channel) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)channel.remoteAddress();
InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
if (inetSocketAddress == null) {
return "";
}
......
......@@ -36,10 +36,10 @@ public class HttpTinyClient {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection)new URL(url).openConnection();
conn = (HttpURLConnection) new URL(url).openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout((int)readTimeoutMs);
conn.setReadTimeout((int)readTimeoutMs);
conn.setConnectTimeout((int) readTimeoutMs);
conn.setReadTimeout((int) readTimeoutMs);
setHeaders(conn, headers, encoding);
conn.connect();
......@@ -90,20 +90,12 @@ public class HttpTinyClient {
}
/**
*
* @param url
* @param headers
* @param paramValues
* @param encoding
* @param readTimeoutMs
*
* @return the http response of given http post request
*
* @throws java.io.IOException
*/
static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
......@@ -112,10 +104,10 @@ public class HttpTinyClient {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection)new URL(url).openConnection();
conn = (HttpURLConnection) new URL(url).openConnection();
conn.setRequestMethod("POST");
conn.setConnectTimeout(3000);
conn.setReadTimeout((int)readTimeoutMs);
conn.setReadTimeout((int) readTimeoutMs);
conn.setDoOutput(true);
conn.setDoInput(true);
setHeaders(conn, headers, encoding);
......
......@@ -56,14 +56,11 @@ public class IOTinyUtils {
return count;
}
/**
*/
static public List<String> readLines(Reader input) throws IOException {
BufferedReader reader = toBufferedReader(input);
List<String> list = new ArrayList<String>();
String line = null;
for (; ; ) {
String line;
for (;;) {
line = reader.readLine();
if (null != line) {
list.add(line);
......@@ -75,7 +72,7 @@ public class IOTinyUtils {
}
static private BufferedReader toBufferedReader(Reader reader) {
return reader instanceof BufferedReader ? (BufferedReader)reader : new BufferedReader(reader);
return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
}
static public void copyFile(String source, String target) throws IOException {
......
......@@ -76,9 +76,9 @@ public class Consumer {
Long[] end = snapshotList.getLast();
final long consumeTps =
(long)(((end[1] - begin[1]) / (double)(end[0] - begin[0])) * 1000L);
final double averageB2CRT = (end[2] - begin[2]) / (double)(end[1] - begin[1]);
final double averageS2CRT = (end[3] - begin[3]) / (double)(end[1] - begin[1]);
(long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
......
......@@ -81,8 +81,8 @@ public class Producer {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
......
......@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast();
final long sendTps =
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
(long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
......
......@@ -44,7 +44,7 @@ public class Producer {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer)arg;
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
......
......@@ -60,7 +60,7 @@ public class FilterServerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterFilterServerResponseHeader responseHeader =
(RegisterFilterServerResponseHeader)response
(RegisterFilterServerResponseHeader) response
.decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
return responseHeader;
......
......@@ -120,7 +120,7 @@ public class FiltersrvStartup {
System.exit(-2);
}
LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
......
......@@ -95,7 +95,7 @@ public class DynaCode {
StringBuffer buf = new StringBuffer();
while (cl != null) {
if (cl instanceof URLClassLoader) {
URL urls[] = ((URLClassLoader)cl).getURLs();
URL urls[] = ((URLClassLoader) cl).getURLs();
for (int i = 0; i < urls.length; i++) {
if (buf.length() > 0) {
buf.append(File.pathSeparatorChar);
......
......@@ -84,7 +84,7 @@ public class FilterClassManager {
Class<?> newClass =
DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
Object newInstance = newClass.newInstance();
filterClassInfo.setMessageFilter((MessageFilter)newInstance);
filterClassInfo.setMessageFilter((MessageFilter) newInstance);
filterClassInfo.setClassCRC(classCRC);
log.info("fetch Remote class File OK, {} {}", next.getKey(),
......@@ -134,7 +134,7 @@ public class FilterClassManager {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
Object newInstance = newClass.newInstance();
filterClassInfoNew.setMessageFilter((MessageFilter)newInstance);
filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
filterClassInfoNew.setClassCRC(classCRC);
}
......
......@@ -85,7 +85,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final RegisterMessageFilterClassRequestHeader requestHeader =
(RegisterMessageFilterClassRequestHeader)request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
(RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
try {
boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
......@@ -109,9 +109,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
final FilterContext filterContext = new FilterContext();
filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
......@@ -331,10 +331,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
if (bodyLength > 0)
msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
msgStoreItemMemory.put((byte)topicLength);
msgStoreItemMemory.put((byte) topicLength);
msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
msgStoreItemMemory.putShort((short)propertiesLength);
msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
msgStoreItemMemory.put(propertiesData);
......
......@@ -102,7 +102,7 @@ public class NamesrvStartup {
System.exit(-2);
}
LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
......
......@@ -53,7 +53,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
......
......@@ -127,7 +127,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutKVConfigRequestHeader requestHeader =
(PutKVConfigRequestHeader)request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
(PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
this.namesrvController.getKvConfigManager().putKVConfig(
requestHeader.getNamespace(),
......@@ -142,9 +142,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader)response.readCustomHeader();
final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
final GetKVConfigRequestHeader requestHeader =
(GetKVConfigRequestHeader)request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
(GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
String value = this.namesrvController.getKvConfigManager().getKVConfig(
requestHeader.getNamespace(),
......@@ -166,7 +166,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteKVConfigRequestHeader requestHeader =
(DeleteKVConfigRequestHeader)request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
(DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
this.namesrvController.getKvConfigManager().deleteKVConfig(
requestHeader.getNamespace(),
......@@ -181,9 +181,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader();
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
......@@ -217,9 +217,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader();
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
......@@ -254,7 +254,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final UnRegisterBrokerRequestHeader requestHeader =
(UnRegisterBrokerRequestHeader)request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
(UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
this.namesrvController.getRouteInfoManager().unregisterBroker(
requestHeader.getClusterName(),
......@@ -270,7 +270,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
......@@ -308,9 +308,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader)response.readCustomHeader();
final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
final WipeWritePermOfBrokerRequestHeader requestHeader =
(WipeWritePermOfBrokerRequestHeader)request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
(WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
......@@ -339,7 +339,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteTopicInNamesrvRequestHeader requestHeader =
(DeleteTopicInNamesrvRequestHeader)request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
......@@ -351,7 +351,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetKVListByNamespaceRequestHeader requestHeader =
(GetKVListByNamespaceRequestHeader)request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
(GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(
requestHeader.getNamespace());
......@@ -370,7 +370,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicsByClusterRequestHeader requestHeader =
(GetTopicsByClusterRequestHeader)request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
......
......@@ -385,7 +385,7 @@ public class RouteInfoManager {
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData();
brokerDataClone.setBrokerName(brokerData.getBrokerName());
brokerDataClone.setBrokerAddrs((HashMap<Long, String>)brokerData
brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
......
......@@ -16,7 +16,7 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
......@@ -309,7 +309,7 @@
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>checkstyle/checkstyle.xml</configLocation>
<configLocation>style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
......
......@@ -67,7 +67,7 @@ public class RemotingHelper {
socketChannel.configureBlocking(true);
//bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
socketChannel.socket().setSoTimeout((int)timeoutMillis);
socketChannel.socket().setSoTimeout((int) timeoutMillis);
ByteBuffer byteBufferRequest = request.encode();
while (byteBufferRequest.hasRemaining()) {
......@@ -168,7 +168,7 @@ public class RemotingHelper {
if (null == channel) {
return "";
}
final InetSocketAddress remote = (InetSocketAddress)channel.remoteAddress();
final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress();
if (remote != null) {
return remote.getAddress().getHostName();
}
......@@ -188,7 +188,7 @@ public class RemotingHelper {
public static String parseSocketAddressName(SocketAddress socketAddress) {
final InetSocketAddress addrs = (InetSocketAddress)socketAddress;
final InetSocketAddress addrs = (InetSocketAddress) socketAddress;
if (addrs != null) {
return addrs.getAddress().getHostName();
}
......
......@@ -67,7 +67,7 @@ public class RemotingUtil {
try {
final Method method = providerClazz.getMethod("provider");
if (method != null) {
final SelectorProvider selectorProvider = (SelectorProvider)method.invoke(null);
final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null);
if (selectorProvider != null) {
result = selectorProvider.openSelector();
}
......@@ -155,7 +155,7 @@ public class RemotingUtil {
public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress)addr;
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
sb.append(inetSocketAddress.getAddress().getHostAddress());
sb.append(":");
sb.append(inetSocketAddress.getPort());
......
......@@ -21,8 +21,6 @@ import org.slf4j.LoggerFactory;
/**
* Base class for background thread
*
*
*/
public abstract class ServiceThread implements Runnable {
private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
......
......@@ -42,7 +42,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf)super.decode(ctx, in);
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
......
......@@ -637,7 +637,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent)evt;
IdleStateEvent evnet = (IdleStateEvent) evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
......
......@@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
......@@ -337,7 +337,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent)evt;
IdleStateEvent evnet = (IdleStateEvent) evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
......
......@@ -21,8 +21,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
* Common remoting command processor
*
*
*/
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
......
......@@ -133,6 +133,6 @@ public class NettyServerConfig implements Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
return (NettyServerConfig)super.clone();
return (NettyServerConfig) super.clone();
}
}
......@@ -36,7 +36,7 @@ public class RequestTask implements Runnable {
@Override
public int hashCode() {
int result = runnable != null ? runnable.hashCode() : 0;
result = 31 * result + (int)(getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
result = 31 * result + (channel != null ? channel.hashCode() : 0);
result = 31 * result + (request != null ? request.hashCode() : 0);
result = 31 * result + (isStopRun() ? 1 : 0);
......@@ -50,7 +50,7 @@ public class RequestTask implements Runnable {
if (!(o instanceof RequestTask))
return false;
final RequestTask that = (RequestTask)o;
final RequestTask that = (RequestTask) o;
if (getCreateTimestamp() != that.getCreateTimestamp())
return false;
......
......@@ -18,15 +18,15 @@
package org.apache.rocketmq.remoting.protocol;
public enum LanguageCode {
JAVA((byte)0),
CPP((byte)1),
DOTNET((byte)2),
PYTHON((byte)3),
DELPHI((byte)4),
ERLANG((byte)5),
RUBY((byte)6),
OTHER((byte)7),
HTTP((byte)8);
JAVA((byte) 0),
CPP((byte) 1),
DOTNET((byte) 2),
PYTHON((byte) 3),
DELPHI((byte) 4),
ERLANG((byte) 5),
RUBY((byte) 6),
OTHER((byte) 7),
HTTP((byte) 8);
private byte code;
......
......@@ -195,7 +195,7 @@ public class RemotingCommand {
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte)((source >> 24) & 0xFF));
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
public static int createNewRequestId() {
......@@ -223,9 +223,9 @@ public class RemotingCommand {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte)((source >> 16) & 0xFF);
result[2] = (byte)((source >> 8) & 0xFF);
result[3] = (byte)(source & 0xFF);
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
......
......@@ -51,11 +51,11 @@ public class RocketMQSerializable {
// ################### content
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short)cmd.getCode());
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short)cmd.getVersion());
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
......@@ -109,7 +109,7 @@ public class RocketMQSerializable {
key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
content.putShort((short)key.length);
content.putShort((short) key.length);
content.put(key);
content.putInt(val.length);
......
......@@ -18,8 +18,8 @@
package org.apache.rocketmq.remoting.protocol;
public enum SerializeType {
JSON((byte)0),
ROCKETMQ((byte)1);
JSON((byte) 0),
ROCKETMQ((byte) 1);
private byte code;
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: MixTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: MixTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......@@ -37,11 +39,16 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class NettyRPCTest {
private static RemotingServer remotingServer;
private static RemotingClient remotingClient;
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
......@@ -71,84 +78,70 @@ public class NettyRPCTest {
return client;
}
@BeforeClass
public static void initialize() throws InterruptedException {
remotingServer = createRemotingServer();
remotingClient = createRemotingClient();
}
@AfterClass
public static void destroy() {
remotingClient.shutdown();
remotingServer.shutdown();
}
@Test
public void test_RPC_Sync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
for (int i = 0; i < 100; i++) {
TestRequestHeader requestHeader = new TestRequestHeader();
requestHeader.setCount(i);
requestHeader.setMessageTitle("HelloMessageTitle");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3000);
System.out.println("invoke result = " + response);
assertTrue(response != null);
}
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
@Test
public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
for (int i = 0; i < 100; i++) {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark(String.valueOf(i));
client.invokeOneway("localhost:8888", request, 1000 * 3);
remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
}
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
@Test
public void test_RPC_Async() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
for (int i = 0; i < 100; i++) {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark(String.valueOf(i));
client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
System.out.println(responseFuture.getResponseCommand());
}
});
}
Thread.sleep(1000 * 3);
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
@Test
public void test_server_call_client() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
final RemotingServer server = createRemotingServer();
final RemotingClient client = createRemotingClient();
server.registerProcessor(0, new NettyRequestProcessor() {
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
try {
return server.invokeSync(ctx.channel(), request, 1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingSendRequestException e) {
e.printStackTrace();
} catch (RemotingTimeoutException e) {
return remotingServer.invokeSync(ctx.channel(), request, 1000 * 10);
} catch (InterruptedException | RemotingSendRequestException | RemotingTimeoutException e) {
e.printStackTrace();
}
......@@ -161,7 +154,7 @@ public class NettyRPCTest {
}
}, Executors.newCachedThreadPool());
client.registerProcessor(0, new NettyRequestProcessor() {
remotingClient.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("client receive server request = " + request);
......@@ -177,14 +170,10 @@ public class NettyRPCTest {
for (int i = 0; i < 3; i++) {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
System.out.println("invoke result = " + response);
assertTrue(response != null);
}
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
}
......
......@@ -306,7 +306,7 @@ public class AllocateMappedFileService extends ServiceThread {
return false;
if (getClass() != obj.getClass())
return false;
AllocateRequest other = (AllocateRequest)obj;
AllocateRequest other = (AllocateRequest) obj;
if (filePath == null) {
if (other.filePath != null)
return false;
......
......@@ -20,8 +20,6 @@ import java.nio.ByteBuffer;
/**
* Write messages callback interface
*
*
*/
public interface AppendMessageCallback {
......@@ -31,7 +29,6 @@ public interface AppendMessageCallback {
* @param byteBuffer
* @param maxBlank
* @param msg
*
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
......
......@@ -18,7 +18,6 @@ package org.apache.rocketmq.store;
/**
* When write a message to the commit log, returns results
*
*/
public class AppendMessageResult {
// Return code
......
......@@ -18,8 +18,6 @@ package org.apache.rocketmq.store;
/**
* When write a message to the commit log, returns code
*
*
*/
public enum AppendMessageStatus {
PUT_OK,
......
......@@ -143,7 +143,7 @@ public class CommitLog {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int)(offset % mappedFileSize);
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
......@@ -637,7 +637,7 @@ public class CommitLog {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService)this.flushCommitLogService;
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
......@@ -729,7 +729,7 @@ public class CommitLog {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int)(offset % mappedFileSize);
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
......@@ -1150,7 +1150,7 @@ public class CommitLog {
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final short propertiesLength = propertiesData == null ? 0 : (short)propertiesData.length;
final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
......@@ -1226,7 +1226,7 @@ public class CommitLog {
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte)topicLength);
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort(propertiesLength);
......
......@@ -131,7 +131,7 @@ public class ConsumeQueue {
if (mappedFile != null) {
long offset = 0;
int low =
minLogicOffset > mappedFile.getFileFromOffset() ? (int)(minLogicOffset - mappedFile
minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile
.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
......@@ -407,7 +407,7 @@ public class ConsumeQueue {
byteBuffer.putInt(Integer.MAX_VALUE);
byteBuffer.putLong(0L);
int until = (int)(untilWhere % this.mappedFileQueue.getMappedFileSize());
int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
......@@ -419,7 +419,7 @@ public class ConsumeQueue {
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize));
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
......
......@@ -459,7 +459,7 @@ public class DefaultMessageStore implements MessageStore {
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
......@@ -1016,7 +1016,7 @@ public class DefaultMessageStore implements MessageStore {
}
private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
return (maxOffsetPy - offsetPy) > memory;
}
......@@ -1288,6 +1288,24 @@ public class DefaultMessageStore implements MessageStore {
return brokerStatsManager;
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
@Override
public boolean isTransientStorePoolDeficient() {
return remainTransientStoreBufferNumbs() == 0;
}
public void unlockMappedFile(final MappedFile mappedFile) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
mappedFile.munlock();
}
}, 6, TimeUnit.SECONDS);
}
class CleanCommitLogService {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
......@@ -1565,6 +1583,10 @@ public class DefaultMessageStore implements MessageStore {
return reputFromOffset;
}
public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}
@Override
public void shutdown() {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
......@@ -1582,10 +1604,6 @@ public class DefaultMessageStore implements MessageStore {
super.shutdown();
}
public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}
public long behind() {
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}
......@@ -1683,22 +1701,4 @@ public class DefaultMessageStore implements MessageStore {
}
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
@Override
public boolean isTransientStorePoolDeficient() {
return remainTransientStoreBufferNumbs() == 0;
}
public void unlockMappedFile(final MappedFile mappedFile) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
mappedFile.munlock();
}
}, 6, TimeUnit.SECONDS);
}
}
......@@ -86,7 +86,7 @@ public class GetMessageResult {
this.messageMapedList.add(mapedBuffer);
this.messageBufferList.add(mapedBuffer.getByteBuffer());
this.bufferTotalSize += mapedBuffer.getSize();
this.msgCount4Commercial += (int)Math.ceil(
this.msgCount4Commercial += (int) Math.ceil(
mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
......
......@@ -125,7 +125,7 @@ public class MappedFile extends ReferenceResource {
}
}
ByteBuffer viewedBuffer = (ByteBuffer)invoke(buffer, methodName);
ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
if (viewedBuffer == null)
return buffer;
else
......@@ -463,7 +463,7 @@ public class MappedFile extends ReferenceResource {
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte)0);
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
......@@ -522,7 +522,7 @@ public class MappedFile extends ReferenceResource {
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
......@@ -537,7 +537,7 @@ public class MappedFile extends ReferenceResource {
public void munlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
......
......@@ -81,13 +81,13 @@ public class MappedFileQueue {
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile)mfs[i];
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile)mfs[mfs.length - 1];
return (MappedFile) mfs[mfs.length - 1];
}
private Object[] copyMappedFiles(final int reservedMappedFiles) {
......@@ -108,9 +108,9 @@ public class MappedFileQueue {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int)(offset % this.mappedFileSize));
file.setCommittedPosition((int)(offset % this.mappedFileSize));
file.setFlushedPosition((int)(offset % this.mappedFileSize));
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
......@@ -273,7 +273,7 @@ public class MappedFileQueue {
while (iterator.hasPrevious()) {
mappedFileLast = iterator.previous();
if (offset >= mappedFileLast.getFileFromOffset()) {
int where = (int)(offset % mappedFileLast.getFileSize());
int where = (int) (offset % mappedFileLast.getFileSize());
mappedFileLast.setFlushedPosition(where);
mappedFileLast.setWrotePosition(where);
mappedFileLast.setCommittedPosition(where);
......@@ -347,7 +347,7 @@ public class MappedFileQueue {
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile)mfs[i];
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
......@@ -387,7 +387,7 @@ public class MappedFileQueue {
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile)mfs[i];
MappedFile mappedFile = (MappedFile) mfs[i];
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
......@@ -457,7 +457,7 @@ public class MappedFileQueue {
try {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
int index = (int)((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
"mappedFileSize: {}, mappedFiles count: {}",
......@@ -510,7 +510,7 @@ public class MappedFileQueue {
Object[] mfs = this.copyMappedFiles(0);
if (mfs != null) {
for (Object mf : mfs) {
if (((ReferenceResource)mf).isAvailable()) {
if (((ReferenceResource) mf).isAvailable()) {
size += this.mappedFileSize;
}
}
......
......@@ -30,7 +30,7 @@ public class StoreUtil {
long physicalTotal = 1024 * 1024 * 1024 * 24;
OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
physicalTotal = ((com.sun.management.OperatingSystemMXBean)osmxb).getTotalPhysicalMemorySize();
physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
}
return physicalTotal;
......
......@@ -50,7 +50,7 @@ public class TransientStorePool {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer)byteBuffer).address();
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
......@@ -60,7 +60,7 @@ public class TransientStorePool {
public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer)byteBuffer).address();
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
}
......
......@@ -182,8 +182,8 @@ public class MessageStoreConfig {
public int getMapedFileSizeConsumeQueue() {
int factor = (int)Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int)(factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}
public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {
......@@ -444,14 +444,14 @@ public class MessageStoreConfig {
return brokerRole;
}
public void setBrokerRole(String brokerRole) {
this.brokerRole = BrokerRole.valueOf(brokerRole);
}
public void setBrokerRole(BrokerRole brokerRole) {
this.brokerRole = brokerRole;
}
public void setBrokerRole(String brokerRole) {
this.brokerRole = BrokerRole.valueOf(brokerRole);
}
public int getHaTransferBatchSize() {
return haTransferBatchSize;
}
......@@ -472,14 +472,14 @@ public class MessageStoreConfig {
return flushDiskType;
}
public void setFlushDiskType(String type) {
this.flushDiskType = FlushDiskType.valueOf(type);
}
public void setFlushDiskType(FlushDiskType flushDiskType) {
this.flushDiskType = flushDiskType;
}
public void setFlushDiskType(String type) {
this.flushDiskType = FlushDiskType.valueOf(type);
}
public int getSyncFlushTimeout() {
return syncFlushTimeout;
}
......@@ -570,6 +570,7 @@ public class MessageStoreConfig {
/**
* Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
......
......@@ -207,7 +207,7 @@ public class HAService {
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
......
......@@ -124,7 +124,7 @@ public class IndexFile {
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
......@@ -183,15 +183,8 @@ public class IndexFile {
boolean result =
begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
result =
result
|| (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
.getEndTimestamp());
result =
result
|| (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
.getEndTimestamp());
result = result || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader.getEndTimestamp());
result = result || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader.getEndTimestamp());
return result;
}
......@@ -219,7 +212,7 @@ public class IndexFile {
|| this.indexHeader.getIndexCount() <= 1) {
// TODO NOTFOUND
} else {
for (int nextIndexToRead = slotValue; ; ) {
for (int nextIndexToRead = slotValue;;) {
if (phyOffsets.size() >= maxNum) {
break;
}
......@@ -231,7 +224,7 @@ public class IndexFile {
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
......
......@@ -106,7 +106,7 @@ public class IndexService {
if (files != null) {
List<IndexFile> fileList = new ArrayList<IndexFile>();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile)files[i];
IndexFile f = (IndexFile) files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
......
......@@ -224,8 +224,6 @@ public class ScheduleMessageService extends ConfigManager {
}
/**
*
* @return
*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
......
......@@ -23,7 +23,7 @@ import com.sun.jna.Platform;
import com.sun.jna.Pointer;
public interface LibC extends Library {
LibC INSTANCE = (LibC)Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
int MADV_WILLNEED = 3;
int MADV_DONTNEED = 4;
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -15,6 +15,8 @@
* limitations under the License.
*
* $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*
* $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
......
......@@ -58,7 +58,7 @@
</module>
<module name="FileLength">
<property name="max" value="2000"/>
<property name="max" value="3000"/>
</module>
<module name="TreeWalker">
......@@ -107,9 +107,7 @@
<module name="StaticVariableName"/>
<module name="TypeName"/>
<!--Checks that there are no import statements that use the * notation-->
<!--
<module name="AvoidStarImport"/>
-->
<!--whitespace-->
<module name="GenericWhitespace"/>
......
......@@ -708,7 +708,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, msgId, timeoutMillis * 3);
} else {
MessageClientExt msgClient = (MessageClientExt)msg;
MessageClientExt msgClient = (MessageClientExt) msg;
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, msgClient.getOffsetMsgId(), timeoutMillis * 3);
}
......
......@@ -194,7 +194,7 @@ public class MQAdminStartup {
private static void initLogback() throws JoranException {
String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
......
......@@ -161,7 +161,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
}
}
double rt = (double)elapsed / (amount - 1);
double rt = (double) elapsed / (amount - 1);
if (!printAsTlog) {
System.out.printf("%-24s %-24s %-8s %-16s %-16s%n",
clusterName,
......
......@@ -141,7 +141,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
groupConsumeInfo.setGroup(consumerGroup);
if (consumeStats != null) {
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
}
......@@ -251,7 +251,7 @@ class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
return o.count - this.count;
}
return (int)(o.diffTotal - diffTotal);
return (int) (o.diffTotal - diffTotal);
}
public int getConsumeTps() {
......
......@@ -88,7 +88,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int queueIndex = (Integer)arg % mqs.size();
int queueIndex = (Integer) arg % mqs.size();
MessageQueue queue = mqs.get(queueIndex);
brokerName = queue.getBrokerName();
queueId = queue.getQueueId();
......@@ -114,7 +114,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
);
}
double rt = (double)timeElapsed / (amount - 1);
double rt = (double) timeElapsed / (amount - 1);
System.out.printf("Avg RT: %s%n", String.format("%.2f", rt));
} catch (Exception e) {
e.printStackTrace();
......
......@@ -246,7 +246,7 @@ public class PrintMessageByQueueCommand implements SubCommand {
@Override
public int compareTo(final TagCountBean o) {
return (int)(o.getCount().get() - this.count.get());
return (int) (o.getCount().get() - this.count.get());
}
}
}
......@@ -57,7 +57,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
String bodyTmpFilePath = createBodyFile(msg);
String msgId = msg.getMsgId();
if (msg instanceof MessageClientExt) {
msgId = ((MessageClientExt)msg).getOffsetMsgId();
msgId = ((MessageClientExt) msg).getOffsetMsgId();
}
System.out.printf("%-20s %s%n",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册