提交 a77d7520 编写于 作者: T Till Rohrmann

Added binary input and output format which uses the objects' TypeSerializer to serialize them.

Added InputTypeConfigurable to TypeSerializerOutputFormat to support a more seamless integration.

This closes #218.
上级 8af04738
......@@ -35,7 +35,6 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.util.StringUtils;
......@@ -44,7 +43,7 @@ import org.apache.flink.util.StringUtils;
* Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
* configuration, these block sizes equal the native block sizes of the HDFS.
*/
public abstract class BinaryInputFormat<T extends IOReadableWritable> extends FileInputFormat<T> {
public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
private static final long serialVersionUID = 1L;
/**
......@@ -188,7 +187,7 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable> extends Fi
return this.createInputSplits(0);
}
protected BlockInfo createBlockInfo() {
public BlockInfo createBlockInfo() {
return new BlockInfo();
}
......
......@@ -24,12 +24,10 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends FileOutputFormat<T> {
public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
private static final long serialVersionUID = 1L;
/**
......
......@@ -28,12 +28,13 @@ import org.apache.flink.core.memory.DataOutputView;
*
* @see SerializedInputFormat
*/
public class SerializedOutputFormat extends BinaryOutputFormat<IOReadableWritable> {
public class SerializedOutputFormat<T extends IOReadableWritable> extends
BinaryOutputFormat<T> {
private static final long serialVersionUID = 1L;
@Override
protected void serialize(IOReadableWritable record, DataOutputView dataOutputView) throws IOException {
protected void serialize(T record, DataOutputView dataOutputView) throws IOException {
record.write(dataOutputView);
}
}
......@@ -31,24 +31,18 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Tests {@link SerializedInputFormat} and {@link SerializedOutputFormat}.
* Test base for {@link org.apache.flink.api.common.io.BinaryInputFormat} and {@link org.apache.flink.api.common.io.BinaryOutputFormat}.
*/
@RunWith(Parameterized.class)
public class SequentialFormatTest {
public abstract class SequentialFormatTestBase<T> {
public class InputSplitSorter implements Comparator<FileInputSplit> {
@Override
......@@ -60,20 +54,18 @@ public class SequentialFormatTest {
private int numberOfTuples;
private long blockSize;
protected long blockSize;
private int degreeOfParallelism;
private BlockInfo info = new SerializedInputFormat<IOReadableWritable>().createBlockInfo();
private int[] rawDataSizes;
private File tempFile;
protected File tempFile;
/**
* Initializes SequentialFormatTest.
*/
public SequentialFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
public SequentialFormatTestBase(int numberOfTuples, long blockSize, int degreeOfParallelism) {
this.numberOfTuples = numberOfTuples;
this.blockSize = blockSize;
this.degreeOfParallelism = degreeOfParallelism;
......@@ -90,7 +82,8 @@ public class SequentialFormatTest {
ByteCounter byteCounter = new ByteCounter();
DataOutputStream out = new DataOutputStream(byteCounter);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
this.getRecord(recordIndex).write(new OutputViewDataOutputStreamWrapper(out));
writeRecord(this.getRecord(recordIndex), new OutputViewDataOutputStreamWrapper
(out));
}
this.rawDataSizes[fileIndex] = byteCounter.getLength();
}
......@@ -118,7 +111,7 @@ public class SequentialFormatTest {
Assert.assertEquals(this.getExpectedBlockCount(fileIndex), sameFileSplits.size());
long lastBlockLength =
this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize()) + this.info.getInfoSize();
this.rawDataSizes[fileIndex] % (this.blockSize - getInfoSize()) + getInfoSize();
for (int index = 0; index < sameFileSplits.size(); index++) {
Assert.assertEquals(this.blockSize * index, sameFileSplits.get(index).getStart());
if (index < sameFileSplits.size() - 1) {
......@@ -134,13 +127,13 @@ public class SequentialFormatTest {
*/
@Test
public void checkRead() throws IOException {
SerializedInputFormat<Record> input = this.createInputFormat();
BinaryInputFormat<T> input = this.createInputFormat();
FileInputSplit[] inputSplits = input.createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter());
int readCount = 0;
for (FileInputSplit inputSplit : inputSplits) {
input.open(inputSplit);
Record record = new Record();
T record = createInstance();
while (!input.reachedEnd()) {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);
......@@ -156,7 +149,7 @@ public class SequentialFormatTest {
*/
@Test
public void checkStatistics() {
SerializedInputFormat<Record> input = this.createInputFormat();
BinaryInputFormat<T> input = this.createInputFormat();
BaseStatistics statistics = input.getStatistics(null);
Assert.assertEquals(this.numberOfTuples, statistics.getNumberOfRecords());
}
......@@ -181,13 +174,12 @@ public class SequentialFormatTest {
*/
@Before
public void writeTuples() throws IOException {
this.tempFile = File.createTempFile("SerializedInputFormat", null);
this.tempFile = File.createTempFile("BinaryInputFormat", null);
this.tempFile.deleteOnExit();
Configuration configuration = new Configuration();
configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
if (this.degreeOfParallelism == 1) {
SerializedOutputFormat output =
FormatUtil.openOutput(SerializedOutputFormat.class, this.tempFile.toURI().toString(),
BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(),
configuration);
for (int index = 0; index < this.numberOfTuples; index++) {
output.writeRecord(this.getRecord(index));
......@@ -198,10 +190,8 @@ public class SequentialFormatTest {
this.tempFile.mkdir();
int recordIndex = 0;
for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
SerializedOutputFormat output =
FormatUtil.openOutput(SerializedOutputFormat.class, this.tempFile.toURI() +
"/"
+ (fileIndex + 1), configuration);
BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" +
(fileIndex+1), configuration);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
output.writeRecord(this.getRecord(recordIndex));
}
......@@ -222,44 +212,40 @@ public class SequentialFormatTest {
File[] files = this.tempFile.isDirectory() ? this.tempFile.listFiles() : new File[] { this.tempFile };
Arrays.sort(files);
for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize());
long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - getInfoSize());
long expectedLength =
(this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + this.info.getInfoSize() +
(this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + getInfoSize() +
lastBlockLength;
Assert.assertEquals(expectedLength, files[fileIndex].length());
}
}
protected SerializedInputFormat<Record> createInputFormat() {
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
abstract protected BinaryInputFormat<T> createInputFormat();
final SerializedInputFormat<Record> inputFormat = new SerializedInputFormat<Record>();
inputFormat.setFilePath(this.tempFile.toURI().toString());
inputFormat.configure(configuration);
return inputFormat;
}
abstract protected BinaryOutputFormat<T> createOutputFormat(final String path, final
Configuration configuration)
throws IOException;
abstract protected int getInfoSize();
/**
* Returns the record to write at the given position
*/
protected Record getRecord(int index) {
return new Record(new IntValue(index), new StringValue(String.valueOf(index)));
}
abstract protected T getRecord(int index);
abstract protected T createInstance();
abstract protected void writeRecord(T record, DataOutputView outputView) throws IOException;
/**
* Checks if both records are equal
*/
private void checkEquals(Record expected, Record actual) {
Assert.assertEquals(expected.getNumFields(), actual.getNumFields());
Assert.assertEquals(expected.getField(0, IntValue.class), actual.getField(0, IntValue.class));
Assert.assertEquals(expected.getField(1, StringValue.class), actual.getField(1, StringValue.class));
}
abstract protected void checkEquals(T expected, T actual);
private int getExpectedBlockCount(int fileIndex) {
int expectedBlockCount =
(int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize - this.info.getInfoSize()));
(int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize -
getInfoSize()));
return expectedBlockCount;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.io;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
@RunWith(Parameterized.class)
public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
private BlockInfo info;
public SerializedFormatTest(int numberOfRecords, long blockSize, int degreeOfParallelism){
super(numberOfRecords, blockSize, degreeOfParallelism);
}
@Before
public void setup(){
info = createInputFormat().createBlockInfo();
}
@Override
protected BinaryInputFormat<Record> createInputFormat() {
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
final SerializedInputFormat<Record> inputFormat = new SerializedInputFormat<Record>();
inputFormat.setFilePath(this.tempFile.toURI().toString());
inputFormat.configure(configuration);
return inputFormat;
}
@SuppressWarnings("unchecked")
@Override
protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
configuration) throws IOException {
return FormatUtil.<Record, SerializedOutputFormat>openOutput
(SerializedOutputFormat.class, path, configuration);
}
@Override
protected int getInfoSize() {
return info.getInfoSize();
}
@Override
protected Record getRecord(int index) {
return new Record(new IntValue(index), new StringValue(String.valueOf(index)));
}
@Override
protected Record createInstance() {
return new Record();
}
@Override
protected void writeRecord(Record record, DataOutputView outputView) throws IOException{
record.write(outputView);
}
@Override
protected void checkEquals(Record expected, Record actual) {
Assert.assertEquals(expected.getNumFields(), actual.getNumFields());
Assert.assertEquals(expected.getField(0, IntValue.class), actual.getField(0, IntValue.class));
Assert.assertEquals(expected.getField(1, StringValue.class), actual.getField(1, StringValue.class));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.BinaryInputFormat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import java.io.IOException;
/**
* Reads elements by deserializing them with a given type serializer.
* @param <T>
*/
public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> {
private TypeSerializer<T> serializer;
public TypeSerializerInputFormat(TypeSerializer<T> serializer){
this.serializer = serializer;
}
@Override
protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
if(serializer == null){
throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " +
"be defined.");
}
return serializer.deserialize(reuse, dataInput);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.BinaryOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
/**
* Stores elements by serializing them with their type serializer.
* @param <T> type parameter
*/
public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements
InputTypeConfigurable {
private TypeSerializer<T> serializer = null;
@Override
protected void serialize(T record, DataOutputView dataOutput) throws IOException {
if(serializer == null){
throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " +
"be defined.");
}
serializer.serialize(record, dataOutput);
}
public void setSerializer(TypeSerializer<T> serializer){
this.serializer = serializer;
}
@Override
public void setInputType(TypeInformation<?> type) {
serializer = (TypeSerializer<T>) type.createSerializer();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.BinaryInputFormat;
import org.apache.flink.api.common.io.BinaryOutputFormat;
import org.apache.flink.api.common.io.BlockInfo;
import org.apache.flink.api.common.io.SequentialFormatTestBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
@RunWith(Parameterized.class)
public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> {
private TypeSerializer<Tuple2<Integer, String>> serializer;
private BlockInfo block;
public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
super(numberOfTuples, blockSize, degreeOfParallelism);
TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0));
serializer = tti.createSerializer();
}
@Before
public void setup(){
block = createInputFormat().createBlockInfo();
}
@Override
protected BinaryInputFormat<Tuple2<Integer, String>> createInputFormat() {
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer);
inputFormat.setFilePath(this.tempFile.toURI().toString());
inputFormat.configure(configuration);
return inputFormat;
}
@Override
protected BinaryOutputFormat<Tuple2<Integer, String>> createOutputFormat(String path, Configuration configuration) throws IOException {
TypeSerializerOutputFormat<Tuple2<Integer, String>> outputFormat = new
TypeSerializerOutputFormat<Tuple2<Integer, String>>();
outputFormat.setSerializer(serializer);
outputFormat.setOutputFilePath(new Path(path));
outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
configuration = configuration == null ? new Configuration() : configuration;
outputFormat.configure(configuration);
outputFormat.open(0, 1);
return outputFormat;
}
@Override
protected int getInfoSize() {
return block.getInfoSize();
}
@Override
protected Tuple2<Integer, String> getRecord(int index) {
return new Tuple2<Integer, String>(index, String.valueOf(index));
}
@Override
protected Tuple2<Integer, String> createInstance() {
return new Tuple2<Integer, String>();
}
@Override
protected void writeRecord(Tuple2<Integer, String> record, DataOutputView outputView) throws IOException {
serializer.serialize(record, outputView);
}
@Override
protected void checkEquals(Tuple2<Integer, String> expected, Tuple2<Integer, String> actual) {
Assert.assertEquals(expected.f0, actual.f0);
Assert.assertEquals(expected.f1, actual.f1);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册