提交 7b0b9d15 编写于 作者: S shutian.lzh

Fix producer example so that it quits normally

上级 f0a340f4
...@@ -24,6 +24,7 @@ import io.openmessaging.OMS; ...@@ -24,6 +24,7 @@ import io.openmessaging.OMS;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult; import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
public class SimpleProducer { public class SimpleProducer {
public static void main(String[] args) { public static void main(String[] args) {
...@@ -38,14 +39,6 @@ public class SimpleProducer { ...@@ -38,14 +39,6 @@ public class SimpleProducer {
producer.startup(); producer.startup();
System.out.printf("Producer startup OK%n"); System.out.printf("Producer startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
{ {
Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message); SendResult sendResult = producer.send(message);
...@@ -53,6 +46,7 @@ public class SimpleProducer { ...@@ -53,6 +46,7 @@ public class SimpleProducer {
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
} }
final CountDownLatch countDownLatch = new CountDownLatch(1);
{ {
final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() { result.addListener(new FutureListener<SendResult>() {
...@@ -63,6 +57,7 @@ public class SimpleProducer { ...@@ -63,6 +57,7 @@ public class SimpleProducer {
} else { } else {
System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId()); System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
} }
countDownLatch.countDown();
} }
}); });
} }
...@@ -71,5 +66,12 @@ public class SimpleProducer { ...@@ -71,5 +66,12 @@ public class SimpleProducer {
producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n"); System.out.printf("Send oneway message OK%n");
} }
try {
countDownLatch.await();
} catch (InterruptedException ignore) {
}
producer.shutdown();
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册