diff --git a/stratosphere-addons/avro/.gitignore b/stratosphere-addons/avro/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..4f02ef9c961fcd12ca3b0a3b1819efaafaa16f3a --- /dev/null +++ b/stratosphere-addons/avro/.gitignore @@ -0,0 +1 @@ +/src/test/java/eu/stratosphere/api/java/record/io/avro/generated/* \ No newline at end of file diff --git a/stratosphere-addons/avro/pom.xml b/stratosphere-addons/avro/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..45c9ee50eb7a5fc288f848ba5a92a5dfce394118 --- /dev/null +++ b/stratosphere-addons/avro/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + + stratosphere-addons + eu.stratosphere + 0.5-SNAPSHOT + .. + + + avro + avro + + jar + + + + eu.stratosphere + stratosphere-java + ${project.version} + + + + eu.stratosphere + stratosphere-clients + ${project.version} + + + + org.apache.avro + avro + 1.7.5 + + + + + + + org.apache.avro + avro-maven-plugin + 1.7.5 + + + generate-test-sources + + schema + + + ${project.basedir}/src/test/resources/avro + ${project.basedir}/src/test/java/ + + + + + + + + diff --git a/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormat.java b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormat.java new file mode 100644 index 0000000000000000000000000000000000000000..b3fe377776a97af80a0e037b41d51cace292a820 --- /dev/null +++ b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormat.java @@ -0,0 +1,188 @@ +package eu.stratosphere.api.java.record.io.avro; + +import java.io.IOException; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.stratosphere.api.java.record.io.FileInputFormat; +import eu.stratosphere.core.fs.FileInputSplit; +import eu.stratosphere.core.fs.FileStatus; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.types.BooleanValue; +import eu.stratosphere.types.DoubleValue; +import eu.stratosphere.types.FloatValue; +import eu.stratosphere.types.IntValue; +import eu.stratosphere.types.LongValue; +import eu.stratosphere.types.NullValue; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; +import eu.stratosphere.types.Value; + + +/** + * Input format to read Avro files. + * + * The input format currently supports only flat avro schemas. So + * there is no support for complex types except for nullable + * primitve fields, e.g. ["string", null] + * (See http://avro.apache.org/docs/current/spec.html#schema_complex) + * + */ +public class AvroInputFormat extends FileInputFormat { + private static final Log LOG = LogFactory.getLog(AvroInputFormat.class); + + + private FileReader dataFileReader; + private GenericRecord reuseAvroRecord = null; + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + DatumReader datumReader = new GenericDatumReader(); + SeekableInput in = new FSDataInputStreamWrapper(stream, (int)split.getLength()); + LOG.info("Opening split "+split); + dataFileReader = DataFileReader.openReader(in, datumReader); + dataFileReader.sync(split.getStart()); + } + @Override + public boolean reachedEnd() throws IOException { + return !dataFileReader.hasNext(); + } + + @Override + public boolean nextRecord(Record record) throws IOException { + if(!dataFileReader.hasNext()) { + return false; + } + if(record == null){ + throw new IllegalArgumentException("Empty PactRecord given"); + } + reuseAvroRecord = dataFileReader.next(reuseAvroRecord); + final List fields = reuseAvroRecord.getSchema().getFields(); + for(Field field : fields) { + final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos())); + record.setField(field.pos(), value); + record.updateBinaryRepresenation(); + } + + return true; + } + /** + * Converts an Avro GenericRecord to a Value. + * @return + */ + private StringValue sString = new StringValue(); + private IntValue sInt = new IntValue(); + private BooleanValue sBool = new BooleanValue(); + private DoubleValue sDouble = new DoubleValue(); + private FloatValue sFloat = new FloatValue(); + private LongValue sLong = new LongValue(); + + private final Value convertAvroToPactValue(final Field field,final Object avroRecord) { + if(avroRecord == null) { + return null; + } + final Type type = checkTypeConstraintsAndGetType(field.schema()); + switch(type) { + case STRING: + sString.setValue((CharSequence) avroRecord); + return sString; + case INT: + sInt.setValue((Integer) avroRecord); + return sInt; + case BOOLEAN: + sBool.setValue((Boolean) avroRecord); + return sBool; + case DOUBLE: + sDouble.setValue((Double) avroRecord); + return sDouble; + case FLOAT: + sFloat.setValue((Float) avroRecord); + return sFloat; + case LONG: + sLong.setValue((Long) avroRecord); + return sLong; + case NULL: + return NullValue.getInstance(); + default: + throw new RuntimeException("Type "+type+" for AvroInputFormat is not implemented. Open an issue on GitHub."); + } + } + + private final Type checkTypeConstraintsAndGetType(final Schema schema) { + final Type type = schema.getType(); + if(type == Type.ARRAY || type == Type.ENUM || type == Type.RECORD || type == Type.MAP ) { + throw new RuntimeException("The given Avro file contains complex data types which are not supported right now"); + } + + if( type == Type.UNION) { + List types = schema.getTypes(); + if(types.size() > 2) { + throw new RuntimeException("The given Avro file contains a union that has more than two elements"); + } + if(types.size() == 1 && types.get(0).getType() != Type.UNION) { + return types.get(0).getType(); + } + if(types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION ) { + throw new RuntimeException("The given Avro file contains a nested union"); + } + if(types.get(0).getType() == Type.NULL) { + return types.get(1).getType(); + } else { + if(types.get(1).getType() != Type.NULL) { + throw new RuntimeException("The given Avro file is contains a union with two non-null types."); + } + return types.get(0).getType(); + } + } + return type; + } + + /** + * Set minNumSplits to number of files. + */ + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + int numAvroFiles = 0; + final Path path = this.filePath; + // get all the files that are involved in the splits + final FileSystem fs = path.getFileSystem(); + final FileStatus pathFile = fs.getFileStatus(path); + + if(!acceptFile(pathFile)) { + throw new IOException("The given file does not pass the file-filter"); + } + if (pathFile.isDir()) { + // input is directory. list all contained files + final FileStatus[] dir = fs.listStatus(path); + for (int i = 0; i < dir.length; i++) { + if (!dir[i].isDir() && acceptFile(dir[i])) { + numAvroFiles++; + } + } + } else { + numAvroFiles = 1; + } + return super.createInputSplits(numAvroFiles); + } + + // dirty hack. needs a fix! + private boolean acceptFile(FileStatus fileStatus) { + final String name = fileStatus.getPath().getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + +} diff --git a/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/FSDataInputStreamWrapper.java b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/FSDataInputStreamWrapper.java new file mode 100644 index 0000000000000000000000000000000000000000..2b7422535eb33765d43f1279524d48c5a45dbfdd --- /dev/null +++ b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/record/io/avro/FSDataInputStreamWrapper.java @@ -0,0 +1,52 @@ +package eu.stratosphere.api.java.record.io.avro; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.avro.file.SeekableInput; + +import eu.stratosphere.core.fs.FSDataInputStream; + + +/** + * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache + * licenced as well) + * + * The wrapper keeps track of the position! + * + */ +public class FSDataInputStreamWrapper implements Closeable, SeekableInput { + private final FSDataInputStream stream; + private final long len; + private long pos; + + FSDataInputStreamWrapper(final FSDataInputStream stream, final int len) { + this.stream = stream; + this.len = len; + this.pos = 0; + } + + public long length() { + return len; + } + + public int read(byte[] b, int off, int len) throws IOException { + int read; + read = stream.read(b, off, len); + pos += read; + return read; + } + + public void seek(long p) throws IOException { + stream.seek(p); + pos = p; + } + + public long tell() throws IOException { + return pos; + } + + public void close() throws IOException { + stream.close(); + } +} diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormatTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormatTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a92e6b00c4e1a7a79f9bf6a3fee3bc4e947a6489 --- /dev/null +++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/record/io/avro/AvroInputFormatTest.java @@ -0,0 +1,88 @@ +package eu.stratosphere.api.java.record.io.avro; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import eu.stratosphere.api.java.record.io.avro.generated.User; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.FileInputSplit; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; + + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + * + */ +public class AvroInputFormatTest { + + private File testFile; + + private final AvroInputFormat format = new AvroInputFormat(); + final static String TEST_NAME = "Alyssa"; + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + User user1 = new User(); + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + + // Construct via builder + User user2 = User.newBuilder() + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .build(); + DatumWriter userDatumWriter = new SpecificDatumWriter(User.class); + DataFileWriter dataFileWriter = new DataFileWriter(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + dataFileWriter.close(); + } + + @Test + public void testDeserialisation() throws IOException { + Configuration parameters = new Configuration(); + format.setFilePath("file://"+testFile.getAbsolutePath()); + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + Assert.assertEquals(splits.length, 1); + format.open(splits[0]); + Record record = new Record(); + Assert.assertTrue(format.nextRecord(record)); + StringValue name = record.getField(0, StringValue.class); + Assert.assertNotNull("empty record", name); + Assert.assertEquals("name not equal",name.getValue(), TEST_NAME); + + Assert.assertFalse("expecting second element", format.reachedEnd()); + Assert.assertTrue("expecting second element", format.nextRecord(record)); + + Assert.assertFalse(format.nextRecord(record)); + Assert.assertTrue(format.reachedEnd()); + + format.close(); + } + + @After + public void deleteFiles() { + testFile.delete(); + } +} diff --git a/stratosphere-addons/avro/src/test/resources/avro/user.avsc b/stratosphere-addons/avro/src/test/resources/avro/user.avsc new file mode 100644 index 0000000000000000000000000000000000000000..6bf7d7393bded0e4d0e5b85c5dc54e902658c3b9 --- /dev/null +++ b/stratosphere-addons/avro/src/test/resources/avro/user.avsc @@ -0,0 +1,13 @@ +{"namespace": "eu.stratosphere.api.java.record.io.avro.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "type_long_test", "type": ["long", "null"]}, + {"name": "type_double_test", "type": ["double"]}, + {"name": "type_null_test", "type": ["null"]}, + {"name": "type_bool_test", "type": ["boolean"]} + ] +} \ No newline at end of file diff --git a/stratosphere-addons/pom.xml b/stratosphere-addons/pom.xml index 7b41dec73f886d07c157fcc75ea6ccf1621f5938..4cf7b596b61c7ac8886c045470b532b3763a1ab1 100644 --- a/stratosphere-addons/pom.xml +++ b/stratosphere-addons/pom.xml @@ -17,7 +17,7 @@ array-datamodel - + avro jdbc spargel swt-visualization