提交 3ac080f0 编写于 作者: U Ufuk Celebi

[FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema

- Adds a deserialization schema from byte[] to Row to be used in conjunction
  with the Table API.
上级 63504a3c
......@@ -49,6 +49,16 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project,
won't depend on flink-table. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
......@@ -101,7 +111,6 @@ under the License.
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
......@@ -124,7 +133,6 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util.serialization;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* Deserialization schema from JSON to {@link Row}.
*
* <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
* the specified fields.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
*/
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
/** Field names to parse. Indices match fieldTypes indices. */
private final String[] fieldNames;
/** Types to parse fields as. Indices match fieldNames indices. */
private final TypeInformation<?>[] fieldTypes;
/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();
/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;
/**
* Creates a JSON deserializtion schema for the given fields and type classes.
*
* @param fieldNames Names of JSON fields to parse.
* @param fieldTypes Type classes to parse JSON fields as.
*/
public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
this.fieldTypes = new TypeInformation[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
}
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
}
/**
* Creates a JSON deserializtion schema for the given fields and types.
*
* @param fieldNames Names of JSON fields to parse.
* @param fieldTypes Types to parse JSON fields as.
*/
public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
}
@Override
public Row deserialize(byte[] message) throws IOException {
try {
JsonNode root = objectMapper.readTree(message);
Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
JsonNode node = root.get(fieldNames[i]);
if (node == null) {
if (failOnMissingField) {
throw new IllegalStateException("Failed to find field with name '"
+ fieldNames[i] + "'.");
} else {
row.setField(i, null);
}
} else {
// Read the value as specified type
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
row.setField(i, value);
}
}
return row;
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
}
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
/**
* Configures the failure behaviour if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*
* @param failOnMissingField Flag indicating whether to fail or not on a missing field.
*/
public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.table.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JsonRowDeserializationSchemaTest {
/**
* Tests simple deserialization.
*/
@Test
public void testDeserialization() throws Exception {
long id = 1238123899121L;
String name = "asdlkjasjkdla998y1122";
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
ObjectMapper objectMapper = new ObjectMapper();
// Root
ObjectNode root = objectMapper.createObjectNode();
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
byte[] serializedJson = objectMapper.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
new String[] { "id", "name", "bytes" },
new Class<?>[] { Long.class, String.class, byte[].class });
Row deserialized = deserializationSchema.deserialize(serializedJson);
assertEquals(3, deserialized.productArity());
assertEquals(id, deserialized.productElement(0));
assertEquals(name, deserialized.productElement(1));
assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
}
/**
* Tests deserialization with non-existing field name.
*/
@Test
public void testMissingNode() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
// Root
ObjectNode root = objectMapper.createObjectNode();
root.put("id", 123123123);
byte[] serializedJson = objectMapper.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
new String[] { "name" },
new Class<?>[] { String.class });
Row row = deserializationSchema.deserialize(serializedJson);
assertEquals(1, row.productArity());
assertNull("Missing field not null", row.productElement(0));
deserializationSchema.setFailOnMissingField(true);
try {
deserializationSchema.deserialize(serializedJson);
fail("Did not throw expected Exception");
} catch (IOException e) {
assertTrue(e.getCause() instanceof IllegalStateException);
}
}
/**
* Tests that number of field names and types has to match.
*/
@Test
public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
try {
new JsonRowDeserializationSchema(
new String[] { "one", "two", "three" },
new Class<?>[] { Long.class });
fail("Did not throw expected Exception");
} catch (IllegalArgumentException ignored) {
// Expected
}
try {
new JsonRowDeserializationSchema(
new String[] { "one" },
new Class<?>[] { Long.class, String.class });
fail("Did not throw expected Exception");
} catch (IllegalArgumentException ignored) {
// Expected
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册