diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index ac7f7c479b0c6b6ee2fc0e8fba85d3710fa95ac1..9d162ac1463a048e457d9e342fc490785a1f75d8 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -33,10 +33,10 @@ public class SimpleProducer { final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); producer.startup(); - System.out.println("producer startup OK"); + System.out.printf("Producer startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -50,25 +50,27 @@ public class SimpleProducer { Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); - System.out.println("send async message OK, msgId: " + sendResult.messageId()); + System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); } { final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new PromiseListener() { - @Override public void operationCompleted(Promise promise) { - System.out.println("Send async message OK, msgId: " + promise.get().messageId()); + @Override + public void operationCompleted(Promise promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); } - @Override public void operationFailed(Promise promise) { - System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage()); + @Override + public void operationFailed(Promise promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); } }); } { producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); - System.out.println("Send oneway message OK"); + System.out.printf("Send oneway message OK%n"); } } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 36b6b1d12a2d18c03f756540aee20d5f68596dab..8e067724dee51049f58b20a19f6a73f53a80f975 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -33,7 +33,7 @@ public class SimplePullConsumer { OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -44,13 +44,13 @@ public class SimplePullConsumer { })); consumer.startup(); - System.out.println("consumer startup OK"); + System.out.printf("Consumer startup OK%n"); while (true) { Message message = consumer.poll(); if (message != null) { String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); - System.out.println("Received one message: " + msgId); + System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index 84c1b15243c31b35a48739b1af94fc2712f4e3ba..b0935d4c74306513d999b5c60b23bec348b52999 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -35,7 +35,7 @@ public class SimplePushConsumer { createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); - System.out.println("messagingAccessPoint startup OK"); + System.out.printf("MessagingAccessPoint startup OK%n"); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override @@ -48,12 +48,12 @@ public class SimplePushConsumer { consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override public void onMessage(final Message message, final ReceivedMessageContext context) { - System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID)); + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); context.ack(); } }); consumer.startup(); - System.out.println("consumer startup OK"); + System.out.printf("Consumer startup OK%n"); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 4dfdca67073657900d92e3326d2b617484cee9df..90f9e03ed3db804056c36a21f5c136831f4ad59f 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -175,7 +175,6 @@ class LocalMessageCache implements ServiceLifecycle { try { if (!msgTreeMap.isEmpty()) { msg = msgTreeMap.firstEntry().getValue(); - System.out.println(msg); if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { //Expired, ack and remove it. diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index a2694e486a02cf4f088b93450f67132d8f50fd79..f9b8058e04af862da90edb074c6d3d7b52ff603f 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -25,9 +25,9 @@ import io.openmessaging.PushConsumer; import io.openmessaging.ReceivedMessageContext; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.OMSUtil; -import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +47,6 @@ public class PushConsumerImpl implements PushConsumer { private final Map subscribeTable = new ConcurrentHashMap<>(); private final ClientConfig clientConfig; - public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.properties = properties; @@ -130,7 +129,8 @@ public class PushConsumerImpl implements PushConsumer { class MessageListenerImpl implements MessageListenerConcurrently { @Override - public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, ConsumeConcurrentlyContext contextRMQ) { + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, + ConsumeConcurrentlyContext contextRMQ) { MessageExt rmqMsg = rmqMsgList.get(0); BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 0b2db4d18645d57cd0181194abe4670f6e1a79a0..8246bcd294ac1987dedcebdea398bceaa597ef67 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; -abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ +abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { final static Logger log = ClientLogger.getLog(); final KeyValue properties; final DefaultMQProducer rocketmqProducer; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index f644e7d487d3be24f4f752fa461d26d095ec42e6..2c00c60ebc476137dd0ff39d4f495034f82b6882 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -25,8 +25,8 @@ import io.openmessaging.Promise; import io.openmessaging.PropertyKeys; import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.promise.DefaultPromise; +import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java index f03826e858e3c804ab57b09fed70a10ace82a2ac..05225cc5058aa1d31fadc335ff0d2a86f0e03337 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -78,7 +78,7 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc try { SendResult sendResult = this.rocketmqProducer.send(rmqMessages); - String [] msgIdArray = sendResult.getMsgId().split(","); + String[] msgIdArray = sendResult.getMsgId().split(","); for (int i = 0; i < messages.size(); i++) { Message message = messages.get(i); message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index 3e4bd266ce117fb52dd78b3cf0caa9f624e29972..c863ccf6ade2848eb61cc2cca789cc9bb2d50366 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -77,7 +77,7 @@ public class DefaultPromise implements Promise { } else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { - for (; ; ) { + for (;; ) { try { lock.wait(waitTime); } catch (InterruptedException e) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java index 9e2f69c2d9ee921aad72a84a35f9972b25f80fcc..84b6c2d74efb4df2125e0406977a17e27bce726a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java @@ -18,11 +18,17 @@ package io.openmessaging.rocketmq.promise; public enum FutureState { - /** the task is doing **/ + /** + * the task is doing + **/ DOING(0), - /** the task is done **/ + /** + * the task is done + **/ DONE(1), - /** ths task is cancelled **/ + /** + * ths task is cancelled + **/ CANCELLED(2); public final int value; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java index d8eed843b685a3d1ab551aaa9bbb558fd11e24c9..104d3d964fa87f758ecbd5fa39848626a6dab447 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -29,7 +29,7 @@ import org.slf4j.Logger; public final class BeanUtils { final static Logger log = ClientLogger.getLog(); - + /** * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */ @@ -136,7 +136,7 @@ public final class BeanUtils { public static T populate(final Properties properties, final T obj) { Class clazz = obj.getClass(); try { - + Set> entries = properties.entrySet(); for (Map.Entry entry : entries) { String entryKey = entry.getKey().toString(); @@ -147,7 +147,7 @@ public final class BeanUtils { } String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); try { - setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { //ignored... }