提交 c508cb33 编写于 作者: S shutian.lzh

Accomodate updated openmessaging api

上级 173f77d2
...@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; ...@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.exception.OMSMessageFormatException;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage { public class BytesMessageImpl implements BytesMessage {
...@@ -33,8 +34,12 @@ public class BytesMessageImpl implements BytesMessage { ...@@ -33,8 +34,12 @@ public class BytesMessageImpl implements BytesMessage {
} }
@Override @Override
public byte[] getBody() { public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
return body; if (type == byte[].class) {
return (T)body;
}
throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
} }
@Override @Override
......
...@@ -46,7 +46,7 @@ public class OMSUtil { ...@@ -46,7 +46,7 @@ public class OMSUtil {
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody()); rmqMessage.setBody(omsMessage.getBody(byte[].class));
KeyValue sysHeaders = omsMessage.sysHeaders(); KeyValue sysHeaders = omsMessage.sysHeaders();
KeyValue userHeaders = omsMessage.userHeaders(); KeyValue userHeaders = omsMessage.userHeaders();
......
...@@ -83,7 +83,7 @@ public class PullConsumerImplTest { ...@@ -83,7 +83,7 @@ public class PullConsumerImplTest {
Message message = consumer.receive(); Message message = consumer.receive();
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
} }
@Test @Test
......
...@@ -75,7 +75,7 @@ public class PushConsumerImplTest { ...@@ -75,7 +75,7 @@ public class PushConsumerImplTest {
@Override @Override
public void onReceived(Message message, Context context) { public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
context.ack(); context.ack();
} }
}); });
......
...@@ -592,7 +592,7 @@ ...@@ -592,7 +592,7 @@
<dependency> <dependency>
<groupId>io.openmessaging</groupId> <groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId> <artifactId>openmessaging-api</artifactId>
<version>0.3.0-alpha</version> <version>0.3.1-alpha-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册