From bcd83c45de8695b3455a3afaa513902eae2a84e7 Mon Sep 17 00:00:00 2001 From: Bill Lee Date: Sat, 17 Feb 2018 17:50:16 +0800 Subject: [PATCH] [FLINK-8544] Handle null message key and value in JSONKeyValueDeserializationSchema This closes #5516 --- .../JSONKeyValueDeserializationSchema.java | 8 +++-- ...JSONKeyValueDeserializationSchemaTest.java | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index caffcec0a40..95b73e1ea49 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -56,8 +56,12 @@ public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSc mapper = new ObjectMapper(); } ObjectNode node = mapper.createObjectNode(); - node.set("key", mapper.readValue(messageKey, JsonNode.class)); - node.set("value", mapper.readValue(message, JsonNode.class)); + if (messageKey != null) { + node.set("key", mapper.readValue(messageKey, JsonNode.class)); + } + if (message != null) { + node.set("value", mapper.readValue(message, JsonNode.class)); + } if (includeMetadata) { node.putObject("metadata") .put("offset", offset) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java index fa79db1afd5..423f637c1f9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -50,6 +50,40 @@ public class JSONKeyValueDeserializationSchemaTest { Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); } + @Test + public void testDeserializeWithoutKey() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + byte[] serializedKey = null; + + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("word", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + + Assert.assertTrue(deserializedValue.get("metadata") == null); + Assert.assertTrue(deserializedValue.get("key") == null); + Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); + } + + @Test + public void testDeserializeWithoutValue() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialKey = mapper.createObjectNode(); + initialKey.put("index", 4); + byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + + byte[] serializedValue = null; + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + + Assert.assertTrue(deserializedValue.get("metadata") == null); + Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); + Assert.assertTrue(deserializedValue.get("value") == null); + } + @Test public void testDeserializeWithMetadata() throws IOException { ObjectMapper mapper = new ObjectMapper(); -- GitLab