diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml index e430ff4eee9e8af593ff9d3ee91862df85fb5a20..1cbb554a9293d393339da1acda0d62222b94b0fd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -49,6 +49,16 @@ under the License. provided + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + true + + org.apache.kafka kafka_${scala.binary.version} @@ -101,7 +111,6 @@ under the License. jar - org.apache.curator curator-test @@ -124,7 +133,6 @@ under the License. test-jar test - diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..1bbef4d847c55aeabe7e4da6d4b2899214e7ceb2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Deserialization schema from JSON to {@link Row}. + * + *

Deserializes the byte[] messages as a JSON object and reads + * the specified fields. + * + *

Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class JsonRowDeserializationSchema implements DeserializationSchema { + + /** Field names to parse. Indices match fieldTypes indices. */ + private final String[] fieldNames; + + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation[] fieldTypes; + + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** Flag indicating whether to fail on a missing field. */ + private boolean failOnMissingField; + + /** + * Creates a JSON deserializtion schema for the given fields and type classes. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Type classes to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, Class[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + + this.fieldTypes = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + /** + * Creates a JSON deserializtion schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Types to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + try { + JsonNode root = objectMapper.readTree(message); + + Row row = new Row(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + JsonNode node = root.get(fieldNames[i]); + + if (node == null) { + if (failOnMissingField) { + throw new IllegalStateException("Failed to find field with name '" + + fieldNames[i] + "'."); + } else { + row.setField(i, null); + } + } else { + // Read the value as specified type + Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); + row.setField(i, value); + } + } + + return row; + } catch (Throwable t) { + throw new IOException("Failed to deserialize JSON object.", t); + } + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + /** + * Configures the failure behaviour if a JSON field is missing. + * + *

By default, a missing field is ignored and the field is set to null. + * + * @param failOnMissingField Flag indicating whether to fail or not on a missing field. + */ + public void setFailOnMissingField(boolean failOnMissingField) { + this.failOnMissingField = failOnMissingField; + } + +} diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java new file mode 100644 index 0000000000000000000000000000000000000000..68225e2ae461fbad7e660121ac4b24855b9e0047 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JsonRowDeserializationSchemaTest { + + /** + * Tests simple deserialization. + */ + @Test + public void testDeserialization() throws Exception { + long id = 1238123899121L; + String name = "asdlkjasjkdla998y1122"; + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", id); + root.put("name", name); + root.put("bytes", bytes); + + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "id", "name", "bytes" }, + new Class[] { Long.class, String.class, byte[].class }); + + Row deserialized = deserializationSchema.deserialize(serializedJson); + + assertEquals(3, deserialized.productArity()); + assertEquals(id, deserialized.productElement(0)); + assertEquals(name, deserialized.productElement(1)); + assertArrayEquals(bytes, (byte[]) deserialized.productElement(2)); + } + + /** + * Tests deserialization with non-existing field name. + */ + @Test + public void testMissingNode() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", 123123123); + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "name" }, + new Class[] { String.class }); + + Row row = deserializationSchema.deserialize(serializedJson); + + assertEquals(1, row.productArity()); + assertNull("Missing field not null", row.productElement(0)); + + deserializationSchema.setFailOnMissingField(true); + + try { + deserializationSchema.deserialize(serializedJson); + fail("Did not throw expected Exception"); + } catch (IOException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + /** + * Tests that number of field names and types has to match. + */ + @Test + public void testNumberOfFieldNamesAndTypesMismatch() throws Exception { + try { + new JsonRowDeserializationSchema( + new String[] { "one", "two", "three" }, + new Class[] { Long.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + + try { + new JsonRowDeserializationSchema( + new String[] { "one" }, + new Class[] { Long.class, String.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + } +}