未验证 提交 bcd83c45 编写于 作者: B Bill Lee 提交者: Dawid Wysakowicz

[FLINK-8544] Handle null message key and value in JSONKeyValueDeserializationSchema

This closes #5516
上级 afb8f825
......@@ -56,8 +56,12 @@ public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSc
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
node.set("key", mapper.readValue(messageKey, JsonNode.class));
node.set("value", mapper.readValue(message, JsonNode.class));
if (messageKey != null) {
node.set("key", mapper.readValue(messageKey, JsonNode.class));
}
if (message != null) {
node.set("value", mapper.readValue(message, JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", offset)
......
......@@ -50,6 +50,40 @@ public class JSONKeyValueDeserializationSchemaTest {
Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
}
@Test
public void testDeserializeWithoutKey() throws IOException {
ObjectMapper mapper = new ObjectMapper();
byte[] serializedKey = null;
ObjectNode initialValue = mapper.createObjectNode();
initialValue.put("word", "world");
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertTrue(deserializedValue.get("key") == null);
Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
}
@Test
public void testDeserializeWithoutValue() throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode initialKey = mapper.createObjectNode();
initialKey.put("index", 4);
byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
byte[] serializedValue = null;
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
Assert.assertTrue(deserializedValue.get("value") == null);
}
@Test
public void testDeserializeWithMetadata() throws IOException {
ObjectMapper mapper = new ObjectMapper();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册