提交 626d6b78 编写于 作者: S Stephan Ewen

[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format,...

[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.
上级 38e4755a
......@@ -54,10 +54,11 @@ under the License.
<!-- version is derived from base module -->
</dependency>
<!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<!-- version is derived from base module -->
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.avro;
import java.io.IOException;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Key;
import org.apache.flink.util.ReflectionUtil;
public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
private static final long serialVersionUID = 1L;
public AvroBaseValue() {}
public AvroBaseValue(T datum) {
super(datum);
}
// --------------------------------------------------------------------------------------------
// Serialization / Deserialization
// --------------------------------------------------------------------------------------------
private ReflectDatumWriter<T> writer;
private ReflectDatumReader<T> reader;
private DataOutputEncoder encoder;
private DataInputDecoder decoder;
@Override
public void write(DataOutputView out) throws IOException {
// the null flag
if (datum() == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
DataOutputEncoder encoder = getEncoder();
encoder.setOut(out);
getWriter().write(datum(), encoder);
}
}
@Override
public void read(DataInputView in) throws IOException {
// the null flag
if (in.readBoolean()) {
DataInputDecoder decoder = getDecoder();
decoder.setIn(in);
datum(getReader().read(datum(), decoder));
}
}
private ReflectDatumWriter<T> getWriter() {
if (this.writer == null) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) datum().getClass();
this.writer = new ReflectDatumWriter<T>(clazz);
}
return this.writer;
}
private ReflectDatumReader<T> getReader() {
if (this.reader == null) {
Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
this.reader = new ReflectDatumReader<T>(datumClass);
}
return this.reader;
}
private DataOutputEncoder getEncoder() {
if (this.encoder == null) {
this.encoder = new DataOutputEncoder();
}
return this.encoder;
}
private DataInputDecoder getDecoder() {
if (this.decoder == null) {
this.decoder = new DataInputDecoder();
}
return this.decoder;
}
// --------------------------------------------------------------------------------------------
// Hashing / Equality
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return datum() == null ? 0 : datum().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() == this.getClass()) {
Object otherDatum = ((AvroBaseValue<?>) obj).datum();
Object thisDatum = datum();
if (thisDatum == null) {
return otherDatum == null;
} else {
return thisDatum.equals(otherDatum);
}
} else {
return false;
}
}
@Override
public String toString() {
return "AvroBaseValue (" + datum() + ")";
}
@SuppressWarnings("unchecked")
@Override
public int compareTo(AvroBaseValue<T> o) {
Object otherDatum = o.datum();
Object thisDatum = datum();
if (thisDatum == null) {
return otherDatum == null ? 0 : -1;
} else {
return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
}
}
}
......@@ -16,90 +16,68 @@
* limitations under the License.
*/
package org.apache.flink.api.java.record.io.avro.example;
package org.apache.flink.api.io.avro.example;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.GenericInputFormat;
import org.apache.flink.api.java.record.operators.GenericDataSink;
import org.apache.flink.api.java.record.operators.GenericDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.client.LocalExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class ReflectiveAvroTypeExample {
@SuppressWarnings("serial")
public class AvroTypeExample {
public static void main(String[] args) throws Exception {
GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
.input(source).name("le mapper").build();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
.input(mapper).name("le reducer").build();
DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
users
.map(new NumberExtractingMapper())
.groupBy(1)
.reduceGroup(new ConcatenatingReducer())
.print();
Plan p = new Plan(sink);
p.setDefaultParallelism(4);
LocalExecutor.execute(p);
env.execute();
}
public static final class NumberExtractingMapper extends MapFunction implements Serializable {
private static final long serialVersionUID = 1L;
public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
@Override
public void map(Record record, Collector<Record> out) throws Exception {
User u = record.getField(0, SUser.class).datum();
record.setField(1, new IntValue(u.getFavoriteNumber()));
out.collect(record);
public Tuple2<User, Integer> map(User user) {
return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
}
}
public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
private static final long serialVersionUID = 1L;
private final Record result = new Record(2);
public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
Record r = records.next();
public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
int number = 0;
StringBuilder colors = new StringBuilder();
int num = r.getField(1, IntValue.class).getValue();
String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
while (records.hasNext()) {
r = records.next();
names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
for (Tuple2<User, Integer> u : values) {
number = u.f1;
colors.append(u.f0.getFavoriteColor()).append(" - ");
}
result.setField(0, new IntValue(num));
result.setField(1, new StringValue(names));
out.collect(result);
colors.setLength(colors.length() - 3);
out.collect(new Tuple2<Integer, String>(number, colors.toString()));
}
}
public static final class UserGeneratingInputFormat extends GenericInputFormat {
public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
private static final long serialVersionUID = 1L;
......@@ -120,42 +98,14 @@ public class ReflectiveAvroTypeExample {
}
@Override
public Record nextRecord(Record record) throws IOException {
public User nextRecord(User reuse) throws IOException {
count++;
User u = new User();
u.setName(NAMES[rnd.nextInt(NAMES.length)]);
u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
u.setFavoriteNumber(rnd.nextInt(87));
SUser su = new SUser();
su.datum(u);
record.setField(0, su);
return record;
}
}
public static final class PrintingOutputFormat implements OutputFormat<Record> {
private static final long serialVersionUID = 1L;
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {}
@Override
public void writeRecord(Record record) throws IOException {
int color = record.getField(0, IntValue.class).getValue();
String names = record.getField(1, StringValue.class).getValue();
System.out.println(color + ": " + names);
return u;
}
@Override
public void close() throws IOException {}
}
}
......@@ -22,7 +22,7 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.flink.api.java.record.io.avro.example;
package org.apache.flink.api.io.avro.example;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
......@@ -115,18 +115,18 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Creates a new User RecordBuilder */
public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
return new org.apache.flink.api.io.avro.example.User.Builder();
}
/** Creates a new User RecordBuilder by copying an existing Builder */
public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
return new org.apache.flink.api.io.avro.example.User.Builder(other);
}
/** Creates a new User RecordBuilder by copying an existing User instance */
public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
return new org.apache.flink.api.io.avro.example.User.Builder(other);
}
/**
......@@ -141,11 +141,11 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
/** Creates a new Builder */
private Builder() {
super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
super(other);
if (isValidValue(fields()[0], other.name)) {
this.name = data().deepCopy(fields()[0].schema(), other.name);
......@@ -162,8 +162,8 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Creates a Builder by copying an existing User instance */
private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
private Builder(org.apache.flink.api.io.avro.example.User other) {
super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
if (isValidValue(fields()[0], other.name)) {
this.name = data().deepCopy(fields()[0].schema(), other.name);
fieldSetFlags()[0] = true;
......@@ -184,7 +184,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Sets the value of the 'name' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
validate(fields()[0], value);
this.name = value;
fieldSetFlags()[0] = true;
......@@ -197,7 +197,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Clears the value of the 'name' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
public org.apache.flink.api.io.avro.example.User.Builder clearName() {
name = null;
fieldSetFlags()[0] = false;
return this;
......@@ -209,7 +209,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Sets the value of the 'favorite_number' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
validate(fields()[1], value);
this.favorite_number = value;
fieldSetFlags()[1] = true;
......@@ -222,7 +222,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Clears the value of the 'favorite_number' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
favorite_number = null;
fieldSetFlags()[1] = false;
return this;
......@@ -234,7 +234,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Sets the value of the 'favorite_color' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
validate(fields()[2], value);
this.favorite_color = value;
fieldSetFlags()[2] = true;
......@@ -247,7 +247,7 @@ public class User extends org.apache.avro.specific.SpecificRecordBase implements
}
/** Clears the value of the 'favorite_color' field */
public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
favorite_color = null;
fieldSetFlags()[2] = false;
return this;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.record.io.avro;
import java.io.IOException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.java.record.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.types.Record;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.ReflectionUtil;
public class AvroInputFormat<E> extends FileInputFormat {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
private final Class<E> avroValueType;
private transient FileReader<E> dataFileReader;
private transient E reuseAvroValue;
private transient AvroBaseValue<E> wrapper;
public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
this.avroWrapperTypeClass = wrapperClass;
this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
this.unsplittable = true;
}
public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
this.avroValueType = avroType;
this.avroWrapperTypeClass = wrapperClass;
this.unsplittable = true;
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
DatumReader<E> datumReader;
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
datumReader = new SpecificDatumReader<E>(avroValueType);
} else {
datumReader = new ReflectDatumReader<E>(avroValueType);
}
LOG.info("Opening split " + split);
SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
dataFileReader = DataFileReader.openReader(in, datumReader);
dataFileReader.sync(split.getStart());
reuseAvroValue = null;
}
@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext();
}
@Override
public Record nextRecord(Record record) throws IOException {
if (!dataFileReader.hasNext()) {
return null;
}
reuseAvroValue = dataFileReader.next(reuseAvroValue);
wrapper.datum(reuseAvroValue);
record.setField(0, wrapper);
return record;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.record.io.avro;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.java.record.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.ListValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.MapValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
/**
* Input format to read Avro files.
*
* The input format currently supports only flat avro schemas. So there is no
* support for complex types except for nullable primitve fields, e.g.
* ["string", null] (See
* http://avro.apache.org/docs/current/spec.html#schema_complex)
*
*/
public class AvroRecordInputFormat extends FileInputFormat {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
private FileReader<GenericRecord> dataFileReader;
private GenericRecord reuseAvroRecord = null;
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
LOG.info("Opening split " + split);
dataFileReader = DataFileReader.openReader(in, datumReader);
dataFileReader.sync(split.getStart());
}
@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext();
}
@Override
public Record nextRecord(Record record) throws IOException {
if (!dataFileReader.hasNext()) {
return null;
}
if (record == null) {
throw new IllegalArgumentException("Empty PactRecord given");
}
reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
final List<Field> fields = reuseAvroRecord.getSchema().getFields();
for (Field field : fields) {
final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
record.setField(field.pos(), value);
record.updateBinaryRepresenation();
}
return record;
}
@SuppressWarnings("unchecked")
private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
if (avroRecord == null) {
return null;
}
final Type type = checkTypeConstraintsAndGetType(field.schema());
// check for complex types
// (complex type FIXED is not yet supported)
switch (type) {
case ARRAY:
final Type elementType = field.schema().getElementType().getType();
final List<?> avroList = (List<?>) avroRecord;
return convertAvroArrayToListValue(elementType, avroList);
case ENUM:
final List<String> symbols = field.schema().getEnumSymbols();
final String avroRecordString = avroRecord.toString();
if (!symbols.contains(avroRecordString)) {
throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
}
sString.setValue(avroRecordString);
return sString;
case MAP:
final Type valueType = field.schema().getValueType().getType();
final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
return convertAvroMapToMapValue(valueType, avroMap);
// primitive type
default:
return convertAvroPrimitiveToValue(type, avroRecord);
}
}
private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
switch (elementType) {
case STRING:
StringListValue sl = new StringListValue();
for (Object item : avroList) {
sl.add(new StringValue((CharSequence) item));
}
return sl;
case INT:
IntListValue il = new IntListValue();
for (Object item : avroList) {
il.add(new IntValue((Integer) item));
}
return il;
case BOOLEAN:
BooleanListValue bl = new BooleanListValue();
for (Object item : avroList) {
bl.add(new BooleanValue((Boolean) item));
}
return bl;
case DOUBLE:
DoubleListValue dl = new DoubleListValue();
for (Object item : avroList) {
dl.add(new DoubleValue((Double) item));
}
return dl;
case FLOAT:
FloatListValue fl = new FloatListValue();
for (Object item : avroList) {
fl.add(new FloatValue((Float) item));
}
return fl;
case LONG:
LongListValue ll = new LongListValue();
for (Object item : avroList) {
ll.add(new LongValue((Long) item));
}
return ll;
default:
throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
}
}
private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
switch (mapValueType) {
case STRING:
StringMapValue sm = new StringMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
}
return sm;
case INT:
IntMapValue im = new IntMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
}
return im;
case BOOLEAN:
BooleanMapValue bm = new BooleanMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
}
return bm;
case DOUBLE:
DoubleMapValue dm = new DoubleMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
}
return dm;
case FLOAT:
FloatMapValue fm = new FloatMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
}
return fm;
case LONG:
LongMapValue lm = new LongMapValue();
for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
}
return lm;
default:
throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
}
}
private StringValue sString = new StringValue();
private IntValue sInt = new IntValue();
private BooleanValue sBool = new BooleanValue();
private DoubleValue sDouble = new DoubleValue();
private FloatValue sFloat = new FloatValue();
private LongValue sLong = new LongValue();
private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
switch (type) {
case STRING:
sString.setValue((CharSequence) avroRecord);
return sString;
case INT:
sInt.setValue((Integer) avroRecord);
return sInt;
case BOOLEAN:
sBool.setValue((Boolean) avroRecord);
return sBool;
case DOUBLE:
sDouble.setValue((Double) avroRecord);
return sDouble;
case FLOAT:
sFloat.setValue((Float) avroRecord);
return sFloat;
case LONG:
sLong.setValue((Long) avroRecord);
return sLong;
case NULL:
return NullValue.getInstance();
default:
throw new RuntimeException(
"Type "
+ type
+ " for AvroInputFormat is not implemented. Open an issue on GitHub.");
}
}
private final Type checkTypeConstraintsAndGetType(final Schema schema) {
final Type type = schema.getType();
if (type == Type.RECORD) {
throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
}
if (type == Type.UNION) {
List<Schema> types = schema.getTypes();
if (types.size() > 2) {
throw new RuntimeException("The given Avro file contains a union that has more than two elements");
}
if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
return types.get(0).getType();
}
if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
throw new RuntimeException("The given Avro file contains a nested union");
}
if (types.get(0).getType() == Type.NULL) {
return types.get(1).getType();
} else {
if (types.get(1).getType() != Type.NULL) {
throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
}
return types.get(0).getType();
}
}
return type;
}
/**
* Set minNumSplits to number of files.
*/
@Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
int numAvroFiles = 0;
final Path path = this.filePath;
// get all the files that are involved in the splits
final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);
if (!acceptFile(pathFile)) {
throw new IOException("The given file does not pass the file-filter");
}
if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (!dir[i].isDir() && acceptFile(dir[i])) {
numAvroFiles++;
}
}
} else {
numAvroFiles = 1;
}
return super.createInputSplits(numAvroFiles);
}
// --------------------------------------------------------------------------------------------
// Concrete subclasses of ListValue and MapValue for all possible primitive types
// --------------------------------------------------------------------------------------------
public static class StringListValue extends ListValue<StringValue> {
private static final long serialVersionUID = 1L;
}
public static class IntListValue extends ListValue<IntValue> {
private static final long serialVersionUID = 1L;
}
public static class BooleanListValue extends ListValue<BooleanValue> {
private static final long serialVersionUID = 1L;
}
public static class DoubleListValue extends ListValue<DoubleValue> {
private static final long serialVersionUID = 1L;
}
public static class FloatListValue extends ListValue<FloatValue> {
private static final long serialVersionUID = 1L;
}
public static class LongListValue extends ListValue<LongValue> {
private static final long serialVersionUID = 1L;
}
public static class StringMapValue extends MapValue<StringValue, StringValue> {
private static final long serialVersionUID = 1L;
}
public static class IntMapValue extends MapValue<StringValue, IntValue> {
private static final long serialVersionUID = 1L;
}
public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
private static final long serialVersionUID = 1L;
}
public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
private static final long serialVersionUID = 1L;
}
public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
private static final long serialVersionUID = 1L;
}
public static class LongMapValue extends MapValue<StringValue, LongValue> {
private static final long serialVersionUID = 1L;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.record.io.avro.example;
import org.apache.flink.api.avro.AvroBaseValue;
public class SUser extends AvroBaseValue<User> {
private static final long serialVersionUID = 1L;
}
......@@ -28,11 +28,11 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.io.avro.example.User;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.api.java.record.io.avro.example.User;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.JavaProgramTestBase;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.avro;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.reflect.Nullable;
import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.io.GenericInputFormat;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.GenericDataSink;
import org.apache.flink.api.java.record.operators.GenericDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
new RandomInputFormat(true));
GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
new RandomInputFormat(false));
CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
.input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
Plan plan = new Plan(sink, "CoGroper Test Plan");
plan.setDefaultParallelism(1);
return plan;
}
public static class SBookAvroValue extends AvroBaseValue<Book> {
private static final long serialVersionUID = 1L;
public SBookAvroValue() {}
public SBookAvroValue(Book datum) {
super(datum);
}
}
public static class Book {
long bookId;
@Nullable
String title;
long authorId;
public Book() {
}
public Book(long bookId, String title, long authorId) {
this.bookId = bookId;
this.title = title;
this.authorId = authorId;
}
}
public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
private static final long serialVersionUID = 1L;
public SBookAuthorValue() {}
public SBookAuthorValue(BookAuthor datum) {
super(datum);
}
}
public static class BookAuthor {
enum BookType {
book,
article,
journal
}
long authorId;
@Nullable
List<String> bookTitles;
@Nullable
List<Book> books;
String authorName;
BookType bookType;
public BookAuthor() {}
public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
this.authorId = authorId;
this.bookTitles = bookTitles;
this.authorName = authorName;
}
}
public static class RandomInputFormat extends GenericInputFormat {
private static final long serialVersionUID = 1L;
private final boolean isBook;
private boolean touched = false;
public RandomInputFormat(boolean isBook) {
this.isBook = isBook;
}
@Override
public boolean reachedEnd() throws IOException {
return touched;
}
@Override
public Record nextRecord(Record record) throws IOException {
touched = true;
record.setField(0, new LongValue(26382648));
if (isBook) {
Book b = new Book(123, "This is a test book", 26382648);
record.setField(1, new SBookAvroValue(b));
} else {
List<String> titles = new ArrayList<String>();
// titles.add("Title1");
// titles.add("Title2");
// titles.add("Title3");
List<Book> books = new ArrayList<Book>();
books.add(new Book(123, "This is a test book", 1));
books.add(new Book(24234234, "This is a test book", 1));
books.add(new Book(1234324, "This is a test book", 3));
BookAuthor a = new BookAuthor(1, titles, "Test Author");
a.books = books;
a.bookType = BookAuthor.BookType.journal;
record.setField(1, new SBookAuthorValue(a));
}
return record;
}
}
public static final class PrintingOutputFormat implements OutputFormat<Record> {
private static final long serialVersionUID = 1L;
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) {}
@Override
public void writeRecord(Record record) throws IOException {
long key = record.getField(0, LongValue.class).getValue();
String val = record.getField(1, StringValue.class).getValue();
System.out.println(key + " : " + val);
}
@Override
public void close() {}
}
public static class MyCoGrouper extends CoGroupFunction {
private static final long serialVersionUID = 1L;
@Override
public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
Record r1 = null;
while (records1.hasNext()) {
r1 = records1.next();
}
Record r2 = null;
while (records2.hasNext()) {
r2 = records2.next();
}
if (r1 != null) {
r1.getField(1, SBookAvroValue.class).datum();
}
if (r2 != null) {
r2.getField(1, SBookAuthorValue.class).datum();
}
}
}
}
......@@ -31,8 +31,8 @@ import java.util.Random;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.java.record.io.avro.generated.Colors;
import org.apache.flink.api.java.record.io.avro.generated.User;
import org.apache.flink.api.io.avro.generated.Colors;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
......
......@@ -39,7 +39,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -125,18 +124,6 @@ public class AvroExternalJarProgram {
}
}
public static final class SUser extends AvroBaseValue<MyUser> {
static final long serialVersionUID = 1L;
public SUser() {}
public SUser(MyUser u) {
super(u);
}
}
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
......
......@@ -16,33 +16,31 @@
* limitations under the License.
*/
package org.apache.flink.api.io.avro;
package org.apache.flink.api.java.record.io.avro;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Assert;
import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
import org.apache.flink.api.java.record.io.avro.generated.Colors;
import org.apache.flink.api.java.record.io.avro.generated.User;
import org.apache.avro.util.Utf8;
import org.apache.flink.api.io.avro.generated.Colors;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.core.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test the avro input format.
* (The testcase is mostly the getting started tutorial of avro)
......@@ -52,7 +50,6 @@ public class AvroRecordInputFormatTest {
private File testFile;
private final AvroRecordInputFormat format = new AvroRecordInputFormat();
final static String TEST_NAME = "Alyssa";
final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
......@@ -63,9 +60,9 @@ public class AvroRecordInputFormatTest {
final static Colors TEST_ENUM_COLOR = Colors.GREEN;
final static CharSequence TEST_MAP_KEY1 = "KEY 1";
final static String TEST_MAP_KEY1 = "KEY 1";
final static long TEST_MAP_VALUE1 = 8546456L;
final static CharSequence TEST_MAP_KEY2 = "KEY 2";
final static String TEST_MAP_KEY2 = "KEY 2";
final static long TEST_MAP_VALUE2 = 17554L;
@Before
......@@ -94,7 +91,7 @@ public class AvroRecordInputFormatTest {
user1.setTypeArrayBoolean(booleanArray);
user1.setTypeEnum(TEST_ENUM_COLOR);
user1.setTypeMap(longMap);
// Construct via builder
User user2 = User.newBuilder()
.setName("Charlie")
......@@ -121,41 +118,44 @@ public class AvroRecordInputFormatTest {
@Test
public void testDeserialisation() throws IOException {
Configuration parameters = new Configuration();
format.setFilePath(testFile.toURI().toString());
AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(1);
Assert.assertEquals(splits.length, 1);
assertEquals(splits.length, 1);
format.open(splits[0]);
Record record = new Record();
Assert.assertNotNull(format.nextRecord(record));
StringValue name = record.getField(0, StringValue.class);
Assert.assertNotNull("empty record", name);
Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
User u = format.nextRecord(null);
assertNotNull(u);
String name = u.getName().toString();
assertNotNull("empty record", name);
assertEquals("name not equal", TEST_NAME, name);
// check arrays
StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
List<CharSequence> sl = u.getTypeArrayString();
assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
List<Boolean> bl = u.getTypeArrayBoolean();
assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
// check enums
StringValue enumValue = record.getField(10, StringValue.class);
Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString());
Colors enumValue = u.getTypeEnum();
assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
// check maps
LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
Map<CharSequence, Long> lm = u.getTypeMap();
assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
Assert.assertFalse("expecting second element", format.reachedEnd());
Assert.assertNotNull("expecting second element", format.nextRecord(record));
assertFalse("expecting second element", format.reachedEnd());
assertNotNull("expecting second element", format.nextRecord(u));
Assert.assertNull(format.nextRecord(record));
Assert.assertTrue(format.reachedEnd());
assertNull(format.nextRecord(u));
assertTrue(format.reachedEnd());
format.close();
}
......
......@@ -22,11 +22,11 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.flink.api.java.record.io.avro.generated;
package org.apache.flink.api.io.avro.generated;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public enum Colors {
RED, GREEN, BLUE ;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.reflect.Nullable;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.junit.Test;
public class AvroSerializerEmptyArrayTest {
@Test
public void testBookSerialization() {
try {
Book b = new Book(123, "This is a test book", 26382648);
AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
test.testAll();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSerialization() {
try {
List<String> titles = new ArrayList<String>();
List<Book> books = new ArrayList<Book>();
books.add(new Book(123, "This is a test book", 1));
books.add(new Book(24234234, "This is a test book", 1));
books.add(new Book(1234324, "This is a test book", 3));
BookAuthor a = new BookAuthor(1, titles, "Test Author");
a.books = books;
a.bookType = BookAuthor.BookType.journal;
AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
test.testAll();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
public static class Book {
long bookId;
@Nullable
String title;
long authorId;
public Book() {}
public Book(long bookId, String title, long authorId) {
this.bookId = bookId;
this.title = title;
this.authorId = authorId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (authorId ^ (authorId >>> 32));
result = prime * result + (int) (bookId ^ (bookId >>> 32));
result = prime * result + ((title == null) ? 0 : title.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Book other = (Book) obj;
if (authorId != other.authorId)
return false;
if (bookId != other.bookId)
return false;
if (title == null) {
if (other.title != null)
return false;
} else if (!title.equals(other.title))
return false;
return true;
}
}
public static class BookAuthor {
enum BookType {
book,
article,
journal
}
long authorId;
@Nullable
List<String> bookTitles;
@Nullable
List<Book> books;
String authorName;
BookType bookType;
public BookAuthor() {}
public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
this.authorId = authorId;
this.bookTitles = bookTitles;
this.authorName = authorName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (authorId ^ (authorId >>> 32));
result = prime * result + ((authorName == null) ? 0 : authorName.hashCode());
result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode());
result = prime * result + ((bookType == null) ? 0 : bookType.hashCode());
result = prime * result + ((books == null) ? 0 : books.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BookAuthor other = (BookAuthor) obj;
if (authorId != other.authorId)
return false;
if (authorName == null) {
if (other.authorName != null)
return false;
} else if (!authorName.equals(other.authorName))
return false;
if (bookTitles == null) {
if (other.bookTitles != null)
return false;
} else if (!bookTitles.equals(other.bookTitles))
return false;
if (bookType != other.bookType)
return false;
if (books == null) {
if (other.books != null)
return false;
} else if (!books.equals(other.books))
return false;
return true;
}
}
}
......@@ -23,5 +23,5 @@ under the License.
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]java[\\/]record[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
</suppressions>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册