提交 95882e28 编写于 作者: R Robert Metzger

Created 'avro' addons subproject

Added 'AvroInputFormat' with test.
上级 6fe66930
/src/test/java/eu/stratosphere/api/java/record/io/avro/generated/*
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>avro</artifactId>
<name>avro</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.5</version>
<executions>
<execution>
<phase>generate-test-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/test/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/test/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package eu.stratosphere.api.java.record.io.avro;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.record.io.FileInputFormat;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.fs.FileStatus;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.types.BooleanValue;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.FloatValue;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.NullValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
/**
* Input format to read Avro files.
*
* The input format currently supports only flat avro schemas. So
* there is no support for complex types except for nullable
* primitve fields, e.g. ["string", null]
* (See http://avro.apache.org/docs/current/spec.html#schema_complex)
*
*/
public class AvroInputFormat extends FileInputFormat {
private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
private FileReader<GenericRecord> dataFileReader;
private GenericRecord reuseAvroRecord = null;
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
SeekableInput in = new FSDataInputStreamWrapper(stream, (int)split.getLength());
LOG.info("Opening split "+split);
dataFileReader = DataFileReader.openReader(in, datumReader);
dataFileReader.sync(split.getStart());
}
@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext();
}
@Override
public boolean nextRecord(Record record) throws IOException {
if(!dataFileReader.hasNext()) {
return false;
}
if(record == null){
throw new IllegalArgumentException("Empty PactRecord given");
}
reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
final List<Field> fields = reuseAvroRecord.getSchema().getFields();
for(Field field : fields) {
final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
record.setField(field.pos(), value);
record.updateBinaryRepresenation();
}
return true;
}
/**
* Converts an Avro GenericRecord to a Value.
* @return
*/
private StringValue sString = new StringValue();
private IntValue sInt = new IntValue();
private BooleanValue sBool = new BooleanValue();
private DoubleValue sDouble = new DoubleValue();
private FloatValue sFloat = new FloatValue();
private LongValue sLong = new LongValue();
private final Value convertAvroToPactValue(final Field field,final Object avroRecord) {
if(avroRecord == null) {
return null;
}
final Type type = checkTypeConstraintsAndGetType(field.schema());
switch(type) {
case STRING:
sString.setValue((CharSequence) avroRecord);
return sString;
case INT:
sInt.setValue((Integer) avroRecord);
return sInt;
case BOOLEAN:
sBool.setValue((Boolean) avroRecord);
return sBool;
case DOUBLE:
sDouble.setValue((Double) avroRecord);
return sDouble;
case FLOAT:
sFloat.setValue((Float) avroRecord);
return sFloat;
case LONG:
sLong.setValue((Long) avroRecord);
return sLong;
case NULL:
return NullValue.getInstance();
default:
throw new RuntimeException("Type "+type+" for AvroInputFormat is not implemented. Open an issue on GitHub.");
}
}
private final Type checkTypeConstraintsAndGetType(final Schema schema) {
final Type type = schema.getType();
if(type == Type.ARRAY || type == Type.ENUM || type == Type.RECORD || type == Type.MAP ) {
throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
}
if( type == Type.UNION) {
List<Schema> types = schema.getTypes();
if(types.size() > 2) {
throw new RuntimeException("The given Avro file contains a union that has more than two elements");
}
if(types.size() == 1 && types.get(0).getType() != Type.UNION) {
return types.get(0).getType();
}
if(types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION ) {
throw new RuntimeException("The given Avro file contains a nested union");
}
if(types.get(0).getType() == Type.NULL) {
return types.get(1).getType();
} else {
if(types.get(1).getType() != Type.NULL) {
throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
}
return types.get(0).getType();
}
}
return type;
}
/**
* Set minNumSplits to number of files.
*/
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
int numAvroFiles = 0;
final Path path = this.filePath;
// get all the files that are involved in the splits
final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);
if(!acceptFile(pathFile)) {
throw new IOException("The given file does not pass the file-filter");
}
if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (!dir[i].isDir() && acceptFile(dir[i])) {
numAvroFiles++;
}
}
} else {
numAvroFiles = 1;
}
return super.createInputSplits(numAvroFiles);
}
// dirty hack. needs a fix!
private boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_") && !name.startsWith(".");
}
}
package eu.stratosphere.api.java.record.io.avro;
import java.io.Closeable;
import java.io.IOException;
import org.apache.avro.file.SeekableInput;
import eu.stratosphere.core.fs.FSDataInputStream;
/**
* Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache
* licenced as well)
*
* The wrapper keeps track of the position!
*
*/
public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
private final FSDataInputStream stream;
private final long len;
private long pos;
FSDataInputStreamWrapper(final FSDataInputStream stream, final int len) {
this.stream = stream;
this.len = len;
this.pos = 0;
}
public long length() {
return len;
}
public int read(byte[] b, int off, int len) throws IOException {
int read;
read = stream.read(b, off, len);
pos += read;
return read;
}
public void seek(long p) throws IOException {
stream.seek(p);
pos = p;
}
public long tell() throws IOException {
return pos;
}
public void close() throws IOException {
stream.close();
}
}
package eu.stratosphere.api.java.record.io.avro;
import java.io.File;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import eu.stratosphere.api.java.record.io.avro.generated.User;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
/**
* Test the avro input format.
* (The testcase is mostly the getting started tutorial of avro)
* http://avro.apache.org/docs/current/gettingstartedjava.html
*
*/
public class AvroInputFormatTest {
private File testFile;
private final AvroInputFormat format = new AvroInputFormat();
final static String TEST_NAME = "Alyssa";
@Before
public void createFiles() throws IOException {
testFile = File.createTempFile("AvroInputFormatTest", null);
User user1 = new User();
user1.setName(TEST_NAME);
user1.setFavoriteNumber(256);
user1.setTypeDoubleTest(123.45d);
user1.setTypeBoolTest(true);
// Construct via builder
User user2 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.setTypeBoolTest(false)
.setTypeDoubleTest(1.337d)
.setTypeNullTest(null)
.setTypeLongTest(1337L)
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), testFile);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();
}
@Test
public void testDeserialisation() throws IOException {
Configuration parameters = new Configuration();
format.setFilePath("file://"+testFile.getAbsolutePath());
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(1);
Assert.assertEquals(splits.length, 1);
format.open(splits[0]);
Record record = new Record();
Assert.assertTrue(format.nextRecord(record));
StringValue name = record.getField(0, StringValue.class);
Assert.assertNotNull("empty record", name);
Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
Assert.assertFalse("expecting second element", format.reachedEnd());
Assert.assertTrue("expecting second element", format.nextRecord(record));
Assert.assertFalse(format.nextRecord(record));
Assert.assertTrue(format.reachedEnd());
format.close();
}
@After
public void deleteFiles() {
testFile.delete();
}
}
{"namespace": "eu.stratosphere.api.java.record.io.avro.generated",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]},
{"name": "type_long_test", "type": ["long", "null"]},
{"name": "type_double_test", "type": ["double"]},
{"name": "type_null_test", "type": ["null"]},
{"name": "type_bool_test", "type": ["boolean"]}
]
}
\ No newline at end of file
......@@ -17,7 +17,7 @@
<modules>
<module>array-datamodel</module>
<!-- To come here: hbase-simple, jdbc, sequenceFile, avro -->
<module>avro</module>
<module>jdbc</module>
<module>spargel</module>
<module>swt-visualization</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册