diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index 702d561fafa94175921c68b3393a0004af1b929d..6d8995a1564fe92e7efd85101048adc8b32c9e2b 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.OMS; +import io.openmessaging.exception.OMSMessageFormatException; import org.apache.commons.lang3.builder.ToStringBuilder; public class BytesMessageImpl implements BytesMessage { @@ -33,8 +34,12 @@ public class BytesMessageImpl implements BytesMessage { } @Override - public byte[] getBody() { - return body; + public T getBody(Class type) throws OMSMessageFormatException { + if (type == byte[].class) { + return (T)body; + } + + throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName()); } @Override diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 23021413c8079c39a4d2ab419a60274244e1e45d..66af8cebb9163a9eabc494efd0864f2bc6c9634e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -46,7 +46,7 @@ public class OMSUtil { public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); - rmqMessage.setBody(omsMessage.getBody()); + rmqMessage.setBody(omsMessage.getBody(byte[].class)); KeyValue sysHeaders = omsMessage.sysHeaders(); KeyValue userHeaders = omsMessage.userHeaders(); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index da2e8a084fe89e44b7e933351e5b9ee2a48422ae..5a0fd9c8c3819c2edfa1199fac2d17c9d59377b2 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -83,7 +83,7 @@ public class PullConsumerImplTest { Message message = consumer.receive(); assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); - assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); } @Test diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index b55816b898958f942d8522cd8104cc28830498a9..d80e02622dbf7f259f65ac3c0abc73557a4c7e46 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -75,7 +75,7 @@ public class PushConsumerImplTest { @Override public void onReceived(Message message, Context context) { assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); - assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); context.ack(); } }); diff --git a/pom.xml b/pom.xml index 5ec47f31554b2b58c00c1561f9e620e7edce1f09..d26e78f7634432e4048d0d429f37715f5b237e8a 100644 --- a/pom.xml +++ b/pom.xml @@ -592,7 +592,7 @@ io.openmessaging openmessaging-api - 0.3.0-alpha + 0.3.1-alpha-SNAPSHOT log4j