diff --git a/docs/en/Example_OpenMessaging.md b/docs/en/Example_OpenMessaging.md new file mode 100644 index 0000000000000000000000000000000000000000..026e76e902d2f7abdeab4e95bb078fc4f15f6654 --- /dev/null +++ b/docs/en/Example_OpenMessaging.md @@ -0,0 +1,118 @@ +# OpenMessaging Example +[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems. + +RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging. + +## OMSProducer +The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions. + +``` +public class OMSProducer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + producer.startup(); + System.out.printf("Producer startup OK%n"); + + { + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId()); + } + + { + final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener() { + @Override + public void operationCompleted(Promise promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); + } + + @Override + public void operationFailed(Promise promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + } + }); + } + + { + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.printf("Send oneway message OK%n"); + } + + producer.shutdown(); + messagingAccessPoint.shutdown(); + } +} +``` +## OMSPullConsumer +Use OMS PullConsumer to poll messages from a specified queue. + +``` +public class OMSPullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + + Message message = consumer.poll(); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.printf("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } +} + +``` +## OMSPushConsumer +Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener + +``` +public class OMSPushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + context.ack(); + } + }); + + } +} +```