提交 b7ec4121 编写于 作者: Y yukon

OpenMessaging code reformat.

上级 6edeb831
......@@ -33,10 +33,10 @@ public class SimpleProducer {
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.println("messagingAccessPoint startup OK");
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.println("producer startup OK");
System.out.printf("Producer startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
......@@ -50,25 +50,27 @@ public class SimpleProducer {
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.println("send async message OK, msgId: " + sendResult.messageId());
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override public void operationCompleted(Promise<SendResult> promise) {
System.out.println("Send async message OK, msgId: " + promise.get().messageId());
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override public void operationFailed(Promise<SendResult> promise) {
System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage());
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.println("Send oneway message OK");
System.out.printf("Send oneway message OK%n");
}
}
}
......@@ -33,7 +33,7 @@ public class SimplePullConsumer {
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.println("messagingAccessPoint startup OK");
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
......@@ -44,13 +44,13 @@ public class SimplePullConsumer {
}));
consumer.startup();
System.out.println("consumer startup OK");
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.println("Received one message: " + msgId);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
}
......
......@@ -35,7 +35,7 @@ public class SimplePushConsumer {
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.println("messagingAccessPoint startup OK");
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
......@@ -48,12 +48,12 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
consumer.startup();
System.out.println("consumer startup OK");
System.out.printf("Consumer startup OK%n");
}
}
......@@ -175,7 +175,6 @@ class LocalMessageCache implements ServiceLifecycle {
try {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
System.out.println(msg);
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
......
......@@ -25,9 +25,9 @@ import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -47,7 +47,6 @@ public class PushConsumerImpl implements PushConsumer {
private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
private final ClientConfig clientConfig;
public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
this.properties = properties;
......@@ -130,7 +129,8 @@ public class PushConsumerImpl implements PushConsumer {
class MessageListenerImpl implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList, ConsumeConcurrentlyContext contextRMQ) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
ConsumeConcurrentlyContext contextRMQ) {
MessageExt rmqMsg = rmqMsgList.get(0);
BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
......
......@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static Logger log = ClientLogger.getLog();
final KeyValue properties;
final DefaultMQProducer rocketmqProducer;
......
......@@ -25,8 +25,8 @@ import io.openmessaging.Promise;
import io.openmessaging.PropertyKeys;
import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
......
......@@ -78,7 +78,7 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc
try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String [] msgIdArray = sendResult.getMsgId().split(",");
String[] msgIdArray = sendResult.getMsgId().split(",");
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
......
......@@ -77,7 +77,7 @@ public class DefaultPromise<V> implements Promise<V> {
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime > 0) {
for (; ; ) {
for (;; ) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
......
......@@ -18,11 +18,17 @@
package io.openmessaging.rocketmq.promise;
public enum FutureState {
/** the task is doing **/
/**
* the task is doing
**/
DOING(0),
/** the task is done **/
/**
* the task is done
**/
DONE(1),
/** ths task is cancelled **/
/**
* ths task is cancelled
**/
CANCELLED(2);
public final int value;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册