From b7ec41213b5b7acf0109e41dc253e09cfdd4b4ba Mon Sep 17 00:00:00 2001 From: yukon Date: Mon, 24 Apr 2017 15:08:15 +0800 Subject: [PATCH] OpenMessaging code reformat. --- .../example/openmessaging/SimpleProducer.java | 18 ++++++++++-------- .../openmessaging/SimplePullConsumer.java | 6 +++--- .../openmessaging/SimplePushConsumer.java | 6 +++--- .../rocketmq/consumer/LocalMessageCache.java | 1 - .../rocketmq/consumer/PushConsumerImpl.java | 6 +++--- .../rocketmq/producer/AbstractOMSProducer.java | 2 +- .../rocketmq/producer/ProducerImpl.java | 2 +- .../producer/SequenceProducerImpl.java | 2 +- .../rocketmq/promise/DefaultPromise.java | 2 +- .../rocketmq/promise/FutureState.java | 12 +++++++++--- .../rocketmq/utils/BeanUtils.java | 6 +++--- 11 files changed, 35 insertions(+), 28 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index ac7f7c47..9d162ac1 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -33,10 +33,10 @@ public class SimpleProducer { final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); producer.startup(); - System.out.println("producer startup OK"); + System.out.printf("Producer startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -50,25 +50,27 @@ public class SimpleProducer { Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); - System.out.println("send async message OK, msgId: " + sendResult.messageId()); + System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); } { final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new PromiseListener() { - @Override public void operationCompleted(Promise promise) { - System.out.println("Send async message OK, msgId: " + promise.get().messageId()); + @Override + public void operationCompleted(Promise promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); } - @Override public void operationFailed(Promise promise) { - System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage()); + @Override + public void operationFailed(Promise promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); } }); } { producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); - System.out.println("Send oneway message OK"); + System.out.printf("Send oneway message OK%n"); } } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 36b6b1d1..8e067724 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -33,7 +33,7 @@ public class SimplePullConsumer { OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -44,13 +44,13 @@ public class SimplePullConsumer { })); consumer.startup(); - System.out.println("consumer startup OK"); + System.out.printf("Consumer startup OK%n"); while (true) { Message message = consumer.poll(); if (message != null) { String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); - System.out.println("Received one message: " + msgId); + System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index 84c1b152..b0935d4c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -35,7 +35,7 @@ public class SimplePushConsumer { createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -48,12 +48,12 @@ public class SimplePushConsumer { consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override public void onMessage(final Message message, final ReceivedMessageContext context) { - System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID)); + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); context.ack(); } }); consumer.startup(); - System.out.println("consumer startup OK"); + System.out.printf("Consumer startup OK%n"); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 4dfdca67..90f9e03e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -175,7 +175,6 @@ class LocalMessageCache implements ServiceLifecycle { try { if (!msgTreeMap.isEmpty()) { msg = msgTreeMap.firstEntry().getValue(); - System.out.println(msg); if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { //Expired, ack and remove it. diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index a2694e48..f9b8058e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -25,9 +25,9 @@ import io.openmessaging.PushConsumer; import io.openmessaging.ReceivedMessageContext; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.OMSUtil; -import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +47,6 @@ public class PushConsumerImpl implements PushConsumer { private final Map subscribeTable = new ConcurrentHashMap<>(); private final ClientConfig clientConfig; - public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.properties = properties; @@ -130,7 +129,8 @@ public class PushConsumerImpl implements PushConsumer { class MessageListenerImpl implements MessageListenerConcurrently { @Override - public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, ConsumeConcurrentlyContext contextRMQ) { + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, + ConsumeConcurrentlyContext contextRMQ) { MessageExt rmqMsg = rmqMsgList.get(0); BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 0b2db4d1..8246bcd2 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; -abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ +abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { final static Logger log = ClientLogger.getLog(); final KeyValue properties; final DefaultMQProducer rocketmqProducer; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index f644e7d4..2c00c60e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -25,8 +25,8 @@ import io.openmessaging.Promise; import io.openmessaging.PropertyKeys; import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.promise.DefaultPromise; +import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java index f03826e8..05225cc5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -78,7 +78,7 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc try { SendResult sendResult = this.rocketmqProducer.send(rmqMessages); - String [] msgIdArray = sendResult.getMsgId().split(","); + String[] msgIdArray = sendResult.getMsgId().split(","); for (int i = 0; i < messages.size(); i++) { Message message = messages.get(i); message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index 3e4bd266..c863ccf6 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -77,7 +77,7 @@ public class DefaultPromise implements Promise { } else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { - for (; ; ) { + for (;; ) { try { lock.wait(waitTime); } catch (InterruptedException e) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java index 9e2f69c2..84b6c2d7 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java @@ -18,11 +18,17 @@ package io.openmessaging.rocketmq.promise; public enum FutureState { - /** the task is doing **/ + /** + * the task is doing + **/ DOING(0), - /** the task is done **/ + /** + * the task is done + **/ DONE(1), - /** ths task is cancelled **/ + /** + * ths task is cancelled + **/ CANCELLED(2); public final int value; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java index d8eed843..104d3d96 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -29,7 +29,7 @@ import org.slf4j.Logger; public final class BeanUtils { final static Logger log = ClientLogger.getLog(); - + /** * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */ @@ -136,7 +136,7 @@ public final class BeanUtils { public static T populate(final Properties properties, final T obj) { Class clazz = obj.getClass(); try { - + Set> entries = properties.entrySet(); for (Map.Entry entry : entries) { String entryKey = entry.getKey().toString(); @@ -147,7 +147,7 @@ public final class BeanUtils { } String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); try { - setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { //ignored... } -- GitLab