提交 942a6231 编写于 作者: A arvid 提交者: StephanEwen

Generified binary and sequential i/o formats

上级 509a717a
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream; import java.io.DataInputStream;
...@@ -32,9 +32,9 @@ import eu.stratosphere.nephele.fs.FileInputSplit; ...@@ -32,9 +32,9 @@ import eu.stratosphere.nephele.fs.FileInputSplit;
import eu.stratosphere.nephele.fs.FileStatus; import eu.stratosphere.nephele.fs.FileStatus;
import eu.stratosphere.nephele.fs.FileSystem; import eu.stratosphere.nephele.fs.FileSystem;
import eu.stratosphere.nephele.fs.Path; import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.nephele.util.StringUtils; import eu.stratosphere.nephele.util.StringUtils;
import eu.stratosphere.pact.common.io.statistics.BaseStatistics; import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
import eu.stratosphere.pact.common.type.PactRecord;
/** /**
* Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
...@@ -42,7 +42,7 @@ import eu.stratosphere.pact.common.type.PactRecord; ...@@ -42,7 +42,7 @@ import eu.stratosphere.pact.common.type.PactRecord;
* *
* @author Arvid Heise * @author Arvid Heise
*/ */
public abstract class BinaryInputFormat extends FileInputFormat { public abstract class BinaryInputFormat<T extends Record> extends FileInputFormat<T> {
/** /**
* The log. * The log.
...@@ -152,29 +152,28 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -152,29 +152,28 @@ public abstract class BinaryInputFormat extends FileInputFormat {
*/ */
@Override @Override
public SequentialStatistics getStatistics(BaseStatistics cachedStats) { public SequentialStatistics getStatistics(BaseStatistics cachedStats) {
final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null; (FileBaseStatistics) cachedStats : null;
try { try {
final Path filePath = this.filePath; final Path filePath = this.filePath;
// get the filesystem // get the filesystem
final FileSystem fs = FileSystem.get(filePath.toUri()); final FileSystem fs = FileSystem.get(filePath.toUri());
final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1); final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);
// let the file input format deal with the up-to-date check and the basic size // let the file input format deal with the up-to-date check and the basic size
final FileBaseStatistics stats = getFileStats(cachedFileStats, filePath, fs, allFiles); final FileBaseStatistics stats = getFileStats(cachedFileStats, filePath, fs, allFiles);
if (stats == null) { if (stats == null) {
return null; return null;
} }
// check whether the file stats are still sequential stats (in that case they are still valid) // check whether the file stats are still sequential stats (in that case they are still valid)
if (stats instanceof SequentialStatistics) { if (stats instanceof SequentialStatistics) {
return (SequentialStatistics) stats; return (SequentialStatistics) stats;
} else {
return createStatistics(allFiles, stats);
} }
return createStatistics(allFiles, stats);
} catch (IOException ioex) { } catch (IOException ioex) {
if (LOG.isWarnEnabled()) if (LOG.isWarnEnabled())
LOG.warn(String.format("Could not determine complete statistics for file '%s' due to an I/O error: %s", LOG.warn(String.format("Could not determine complete statistics for file '%s' due to an I/O error: %s",
...@@ -204,7 +203,8 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -204,7 +203,8 @@ public abstract class BinaryInputFormat extends FileInputFormat {
* @param stats * @param stats
* The pre-filled statistics. * The pre-filled statistics.
*/ */
protected SequentialStatistics createStatistics(List<FileStatus> files, FileBaseStatistics stats) throws IOException { protected SequentialStatistics createStatistics(List<FileStatus> files, FileBaseStatistics stats)
throws IOException {
if (files.isEmpty()) if (files.isEmpty())
return null; return null;
...@@ -225,11 +225,12 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -225,11 +225,12 @@ public abstract class BinaryInputFormat extends FileInputFormat {
} }
final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount); final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount);
return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth, totalCount); return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth,
totalCount);
} }
private static class SequentialStatistics extends FileBaseStatistics { private static class SequentialStatistics extends FileBaseStatistics {
private final long numberOfRecords; private final long numberOfRecords;
public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord, long numberOfRecords) { public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord, long numberOfRecords) {
...@@ -285,7 +286,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -285,7 +286,7 @@ public abstract class BinaryInputFormat extends FileInputFormat {
* @see eu.stratosphere.pact.common.io.InputFormat#nextRecord(eu.stratosphere.pact.common.type.PactRecord) * @see eu.stratosphere.pact.common.io.InputFormat#nextRecord(eu.stratosphere.pact.common.type.PactRecord)
*/ */
@Override @Override
public boolean nextRecord(PactRecord record) throws IOException { public boolean nextRecord(T record) throws IOException {
if (this.reachedEnd()) if (this.reachedEnd())
return false; return false;
this.deserialize(record, this.dataInputStream); this.deserialize(record, this.dataInputStream);
...@@ -293,7 +294,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -293,7 +294,7 @@ public abstract class BinaryInputFormat extends FileInputFormat {
return true; return true;
} }
protected abstract void deserialize(PactRecord record, DataInput dataInput) throws IOException; protected abstract void deserialize(T record, DataInput dataInput) throws IOException;
/** /**
* Writes a block info at the end of the blocks.<br> * Writes a block info at the end of the blocks.<br>
...@@ -320,12 +321,12 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -320,12 +321,12 @@ public abstract class BinaryInputFormat extends FileInputFormat {
public int read() throws IOException { public int read() throws IOException {
if (this.blockPos++ >= this.maxPayloadSize) if (this.blockPos++ >= this.maxPayloadSize)
this.skipHeader(); this.skipHeader();
return in.read(); return this.in.read();
} }
private void skipHeader() throws IOException { private void skipHeader() throws IOException {
byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()]; byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()];
in.read(dummy, 0, dummy.length); this.in.read(dummy, 0, dummy.length);
this.blockPos = 0; this.blockPos = 0;
} }
...@@ -347,7 +348,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { ...@@ -347,7 +348,7 @@ public abstract class BinaryInputFormat extends FileInputFormat {
int totalRead = 0; int totalRead = 0;
for (int remainingLength = len, offset = off; remainingLength > 0;) { for (int remainingLength = len, offset = off; remainingLength > 0;) {
int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos); int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos);
int read = in.read(b, offset, blockLen); int read = this.in.read(b, offset, blockLen);
if (read < 0) if (read < 0)
return read; return read;
totalRead += read; totalRead += read;
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
...@@ -21,12 +21,12 @@ import java.io.IOException; ...@@ -21,12 +21,12 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.nephele.types.Record;
/** /**
* @author Arvid Heise * @author Arvid Heise
*/ */
public abstract class BinaryOutputFormat extends FileOutputFormat { public abstract class BinaryOutputFormat<T extends Record> extends FileOutputFormat<T> {
/** /**
* The config parameter which defines the fixed length of a record. * The config parameter which defines the fixed length of a record.
*/ */
...@@ -52,7 +52,8 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { ...@@ -52,7 +52,8 @@ public abstract class BinaryOutputFormat extends FileOutputFormat {
this.dataOutputStream.close(); this.dataOutputStream.close();
super.close(); super.close();
} }
@SuppressWarnings("unused")
protected void complementBlockInfo(BlockInfo blockInfo) throws IOException { protected void complementBlockInfo(BlockInfo blockInfo) throws IOException {
} }
...@@ -92,14 +93,14 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { ...@@ -92,14 +93,14 @@ public abstract class BinaryOutputFormat extends FileOutputFormat {
this.dataOutputStream = new DataOutputStream(this.blockBasedInput); this.dataOutputStream = new DataOutputStream(this.blockBasedInput);
} }
protected abstract void serialize(PactRecord record, DataOutput dataOutput) throws IOException; protected abstract void serialize(T record, DataOutput dataOutput) throws IOException;
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see eu.stratosphere.pact.common.io.OutputFormat#writeRecord(eu.stratosphere.pact.common.type.PactRecord) * @see eu.stratosphere.pact.common.io.OutputFormat#writeRecord(eu.stratosphere.pact.common.type.PactRecord)
*/ */
@Override @Override
public void writeRecord(PactRecord record) throws IOException { public void writeRecord(T record) throws IOException {
this.blockBasedInput.startRecord(); this.blockBasedInput.startRecord();
this.serialize(record, this.dataOutputStream); this.serialize(record, this.dataOutputStream);
} }
...@@ -172,7 +173,7 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { ...@@ -172,7 +173,7 @@ public abstract class BinaryOutputFormat extends FileOutputFormat {
for (int remainingLength = len, offset = off; remainingLength > 0;) { for (int remainingLength = len, offset = off; remainingLength > 0;) {
int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos); int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos);
out.write(b, offset, blockLen); this.out.write(b, offset, blockLen);
this.blockPos += blockLen; this.blockPos += blockLen;
if (this.blockPos >= this.maxPayloadSize) if (this.blockPos >= this.maxPayloadSize)
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
...@@ -25,9 +25,8 @@ import eu.stratosphere.nephele.fs.FileInputSplit; ...@@ -25,9 +25,8 @@ import eu.stratosphere.nephele.fs.FileInputSplit;
import eu.stratosphere.nephele.fs.FileStatus; import eu.stratosphere.nephele.fs.FileStatus;
import eu.stratosphere.nephele.fs.FileSystem; import eu.stratosphere.nephele.fs.FileSystem;
import eu.stratosphere.nephele.fs.Path; import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.pact.common.util.ReflectionUtil; import eu.stratosphere.pact.common.util.ReflectionUtil;
import eu.stratosphere.pact.generic.io.InputFormat;
import eu.stratosphere.pact.generic.io.OutputFormat;
/** /**
* Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}. * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
...@@ -36,6 +35,7 @@ import eu.stratosphere.pact.generic.io.OutputFormat; ...@@ -36,6 +35,7 @@ import eu.stratosphere.pact.generic.io.OutputFormat;
*/ */
public class FormatUtil { public class FormatUtil {
/** /**
* Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration} * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
* initializes the format. * initializes the format.
...@@ -52,12 +52,12 @@ public class FormatUtil { ...@@ -52,12 +52,12 @@ public class FormatUtil {
* @throws IOException * @throws IOException
* if an I/O error occurred while accessing the file or initializing the InputFormat. * if an I/O error occurred while accessing the file or initializing the InputFormat.
*/ */
public static <T extends FileInputFormat> T openInput( public static <T extends Record, F extends FileInputFormat<T>> F openInput(
Class<T> inputFormatClass, String path, Configuration configuration) throws IOException { Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
configuration = configuration == null ? new Configuration() : configuration; configuration = configuration == null ? new Configuration() : configuration;
Path normalizedPath = normalizePath(new Path(path)); Path normalizedPath = normalizePath(new Path(path));
final T inputFormat = ReflectionUtil.newInstance(inputFormatClass); final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, path); configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, path);
configuration.setLong(FileInputFormat.INPUT_STREAM_OPEN_TIMEOUT_KEY, 0); configuration.setLong(FileInputFormat.INPUT_STREAM_OPEN_TIMEOUT_KEY, 0);
...@@ -88,15 +88,15 @@ public class FormatUtil { ...@@ -88,15 +88,15 @@ public class FormatUtil {
* if an I/O error occurred while accessing the files or initializing the InputFormat. * if an I/O error occurred while accessing the files or initializing the InputFormat.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T extends FileInputFormat> T[] openAllInputs( public static <T extends Record, F extends FileInputFormat<T>> F[] openAllInputs(
Class<T> inputFormatClass, String path, Configuration configuration) throws IOException { Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
Path nephelePath = new Path(path); Path nephelePath = new Path(path);
FileSystem fs = nephelePath.getFileSystem(); FileSystem fs = nephelePath.getFileSystem();
FileStatus fileStatus = fs.getFileStatus(nephelePath); FileStatus fileStatus = fs.getFileStatus(nephelePath);
if (!fileStatus.isDir()) if (!fileStatus.isDir())
return (T[]) new FileInputFormat[] { openInput(inputFormatClass, path, configuration) }; return (F[]) new FileInputFormat[] { openInput(inputFormatClass, path, configuration) };
FileStatus[] list = fs.listStatus(nephelePath); FileStatus[] list = fs.listStatus(nephelePath);
T[] formats = (T[]) new FileInputFormat[list.length]; F[] formats = (F[]) new FileInputFormat[list.length];
for (int index = 0; index < formats.length; index++) for (int index = 0; index < formats.length; index++)
formats[index] = openInput(inputFormatClass, list[index].getPath().toString(), configuration); formats[index] = openInput(inputFormatClass, list[index].getPath().toString(), configuration);
return formats; return formats;
...@@ -118,13 +118,13 @@ public class FormatUtil { ...@@ -118,13 +118,13 @@ public class FormatUtil {
* @throws IOException * @throws IOException
* if an I/O error occurred while accessing the file or initializing the OutputFormat. * if an I/O error occurred while accessing the file or initializing the OutputFormat.
*/ */
public static <T extends FileOutputFormat> T openOutput( public static <T extends Record, F extends FileOutputFormat<T>> F openOutput(
Class<T> outputFormatClass, String path, Configuration configuration) throws IOException { Class<F> outputFormatClass, String pathString, Configuration configuration) throws IOException {
final T outputFormat = ReflectionUtil.newInstance(outputFormatClass); final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
configuration = configuration == null ? new Configuration() : configuration; configuration = configuration == null ? new Configuration() : configuration;
configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, path); configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, pathString);
configuration.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0); configuration.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0);
outputFormat.configure(configuration); outputFormat.configure(configuration);
outputFormat.open(1); outputFormat.open(1);
......
...@@ -12,20 +12,20 @@ ...@@ -12,20 +12,20 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.nephele.types.Record;
/** /**
* Reads the {@link PactRecord}s from the native format which is deserializable without configuration. * Reads the {@link Record}s from the native format which is deserializable without configuration.
* *
* @author Arvid Heise * @author Arvid Heise
* @see SequentialOutputFormat * @see SequentialOutputFormat
*/ */
public class SequentialInputFormat extends BinaryInputFormat { public class SequentialInputFormat<T extends Record> extends BinaryInputFormat<T> {
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see * @see
...@@ -33,7 +33,7 @@ public class SequentialInputFormat extends BinaryInputFormat { ...@@ -33,7 +33,7 @@ public class SequentialInputFormat extends BinaryInputFormat {
* , java.io.DataInput) * , java.io.DataInput)
*/ */
@Override @Override
protected void deserialize(PactRecord record, DataInput dataInput) throws IOException { protected void deserialize(Record record, DataInput dataInput) throws IOException {
record.read(dataInput); record.read(dataInput);
} }
} }
\ No newline at end of file
...@@ -12,21 +12,21 @@ ...@@ -12,21 +12,21 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.nephele.types.Record;
/** /**
* Stores complete {@link PactRecord}s in an efficient binary format which is deserializable without configuration. * Stores complete {@link Record}s in an efficient binary format which is deserializable without configuration.
* *
* @author Arvid Heise * @author Arvid Heise
* @see BlockBasedOutputFormat * @see BlockBasedOutputFormat
* @see SequentialInputFormat * @see SequentialInputFormat
*/ */
public class SequentialOutputFormat extends BinaryOutputFormat { public class SequentialOutputFormat extends BinaryOutputFormat<Record> {
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see * @see
...@@ -34,7 +34,7 @@ public class SequentialOutputFormat extends BinaryOutputFormat { ...@@ -34,7 +34,7 @@ public class SequentialOutputFormat extends BinaryOutputFormat {
* , java.io.DataOutput) * , java.io.DataOutput)
*/ */
@Override @Override
protected void serialize(PactRecord record, DataOutput dataOutputStream) throws IOException { protected void serialize(Record record, DataOutput dataOutputStream) throws IOException {
record.write(dataOutputStream); record.write(dataOutputStream);
} }
} }
\ No newline at end of file
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.pact.common.io; package eu.stratosphere.pact.generic.io;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
...@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters; ...@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.fs.FileInputSplit; import eu.stratosphere.nephele.fs.FileInputSplit;
import eu.stratosphere.nephele.fs.Path; import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.pact.common.io.statistics.BaseStatistics; import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactInteger; import eu.stratosphere.pact.common.type.base.PactInteger;
...@@ -67,7 +68,7 @@ public class SequentialFormatTest { ...@@ -67,7 +68,7 @@ public class SequentialFormatTest {
private int degreeOfParallelism; private int degreeOfParallelism;
private BlockInfo info = new SequentialInputFormat().createBlockInfo(); private BlockInfo info = new SequentialInputFormat<Record>().createBlockInfo();
private int[] rawDataSizes; private int[] rawDataSizes;
...@@ -89,12 +90,12 @@ public class SequentialFormatTest { ...@@ -89,12 +90,12 @@ public class SequentialFormatTest {
@Before @Before
public void calcRawDataSize() throws IOException { public void calcRawDataSize() throws IOException {
int recordIndex = 0; int recordIndex = 0;
for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
ByteCounter byteCounter = new ByteCounter(); ByteCounter byteCounter = new ByteCounter();
DataOutputStream out = new DataOutputStream(byteCounter); DataOutputStream out = new DataOutputStream(byteCounter);
for (int fileCount = 0; fileCount < getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++)
getRecord(recordIndex).write(out); this.getRecord(recordIndex).write(out);
rawDataSizes[fileIndex] = byteCounter.getLength(); this.rawDataSizes[fileIndex] = byteCounter.getLength();
} }
} }
...@@ -103,26 +104,27 @@ public class SequentialFormatTest { ...@@ -103,26 +104,27 @@ public class SequentialFormatTest {
*/ */
@Test @Test
public void checkInputSplits() throws IOException { public void checkInputSplits() throws IOException {
FileInputSplit[] inputSplits = createInputFormat().createInputSplits(0); FileInputSplit[] inputSplits = this.createInputFormat().createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter()); Arrays.sort(inputSplits, new InputSplitSorter());
int splitIndex = 0; int splitIndex = 0;
for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
List<FileInputSplit> sameFileSplits = new ArrayList<FileInputSplit>(); List<FileInputSplit> sameFileSplits = new ArrayList<FileInputSplit>();
Path lastPath = inputSplits[splitIndex].getPath(); Path lastPath = inputSplits[splitIndex].getPath();
for (; splitIndex < inputSplits.length; splitIndex++) for (; splitIndex < inputSplits.length; splitIndex++) {
if (!inputSplits[splitIndex].getPath().equals(lastPath)) if (!inputSplits[splitIndex].getPath().equals(lastPath))
break; break;
else sameFileSplits.add(inputSplits[splitIndex]);
sameFileSplits.add(inputSplits[splitIndex]); }
Assert.assertEquals(getExpectedBlockCount(fileIndex), sameFileSplits.size()); Assert.assertEquals(this.getExpectedBlockCount(fileIndex), sameFileSplits.size());
long lastBlockLength = rawDataSizes[fileIndex] % (blockSize - info.getInfoSize()) + info.getInfoSize(); long lastBlockLength =
this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize()) + this.info.getInfoSize();
for (int index = 0; index < sameFileSplits.size(); index++) { for (int index = 0; index < sameFileSplits.size(); index++) {
Assert.assertEquals(blockSize * index, sameFileSplits.get(index).getStart()); Assert.assertEquals(this.blockSize * index, sameFileSplits.get(index).getStart());
if (index < sameFileSplits.size() - 1) if (index < sameFileSplits.size() - 1)
Assert.assertEquals(blockSize, sameFileSplits.get(index).getLength()); Assert.assertEquals(this.blockSize, sameFileSplits.get(index).getLength());
} }
Assert.assertEquals(lastBlockLength, sameFileSplits.get(sameFileSplits.size() - 1).getLength()); Assert.assertEquals(lastBlockLength, sameFileSplits.get(sameFileSplits.size() - 1).getLength());
} }
...@@ -133,7 +135,7 @@ public class SequentialFormatTest { ...@@ -133,7 +135,7 @@ public class SequentialFormatTest {
*/ */
@Test @Test
public void checkRead() throws IOException { public void checkRead() throws IOException {
SequentialInputFormat input = createInputFormat(); SequentialInputFormat<PactRecord> input = this.createInputFormat();
FileInputSplit[] inputSplits = input.createInputSplits(0); FileInputSplit[] inputSplits = input.createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter()); Arrays.sort(inputSplits, new InputSplitSorter());
int readCount = 0; int readCount = 0;
...@@ -142,32 +144,32 @@ public class SequentialFormatTest { ...@@ -142,32 +144,32 @@ public class SequentialFormatTest {
PactRecord record = new PactRecord(); PactRecord record = new PactRecord();
while (!input.reachedEnd()) while (!input.reachedEnd())
if (input.nextRecord(record)) { if (input.nextRecord(record)) {
checkEquals(getRecord(readCount), record); this.checkEquals(this.getRecord(readCount), record);
readCount++; readCount++;
} }
} }
Assert.assertEquals(numberOfTuples, readCount); Assert.assertEquals(this.numberOfTuples, readCount);
} }
/** /**
* Tests the statistics of the given format. * Tests the statistics of the given format.
*/ */
@Test @Test
public void checkStatistics() throws IOException { public void checkStatistics() {
SequentialInputFormat input = createInputFormat(); SequentialInputFormat<PactRecord> input = this.createInputFormat();
BaseStatistics statistics = input.getStatistics(null); BaseStatistics statistics = input.getStatistics(null);
Assert.assertEquals(numberOfTuples, statistics.getNumberOfRecords()); Assert.assertEquals(this.numberOfTuples, statistics.getNumberOfRecords());
} }
@After @After
public void cleanup() { public void cleanup() {
deleteRecursively(tempFile); this.deleteRecursively(this.tempFile);
} }
private void deleteRecursively(File file) { private void deleteRecursively(File file) {
if (file.isDirectory()) if (file.isDirectory())
for (File subFile : file.listFiles()) for (File subFile : file.listFiles())
deleteRecursively(subFile); this.deleteRecursively(subFile);
else else
file.delete(); file.delete();
} }
...@@ -177,57 +179,59 @@ public class SequentialFormatTest { ...@@ -177,57 +179,59 @@ public class SequentialFormatTest {
*/ */
@Before @Before
public void writeTuples() throws IOException { public void writeTuples() throws IOException {
tempFile = File.createTempFile("SequentialInputFormat", null); this.tempFile = File.createTempFile("SequentialInputFormat", null);
tempFile.deleteOnExit(); this.tempFile.deleteOnExit();
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
configuration.setLong(SequentialOutputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
if (degreeOfParallelism == 1) { if (this.degreeOfParallelism == 1) {
SequentialOutputFormat output = SequentialOutputFormat output =
FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + tempFile.getAbsolutePath(), FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + this.tempFile.getAbsolutePath(),
configuration); configuration);
for (int index = 0; index < numberOfTuples; index++) for (int index = 0; index < this.numberOfTuples; index++)
output.writeRecord(getRecord(index)); output.writeRecord(this.getRecord(index));
output.close(); output.close();
} else { } else {
tempFile.delete(); this.tempFile.delete();
tempFile.mkdir(); this.tempFile.mkdir();
int recordIndex = 0; int recordIndex = 0;
for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
SequentialOutputFormat output = SequentialOutputFormat output =
FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + tempFile.getAbsolutePath() + "/" FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + this.tempFile.getAbsolutePath() +
"/"
+ (fileIndex + 1), configuration); + (fileIndex + 1), configuration);
for (int fileCount = 0; fileCount < getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++)
output.writeRecord(getRecord(recordIndex)); output.writeRecord(this.getRecord(recordIndex));
output.close(); output.close();
} }
} }
} }
private int getNumberOfTuplesPerFile(int fileIndex) { private int getNumberOfTuplesPerFile(@SuppressWarnings("unused") int fileIndex) {
return numberOfTuples / degreeOfParallelism; return this.numberOfTuples / this.degreeOfParallelism;
} }
/** /**
* Tests if the length of the file matches the expected value. * Tests if the length of the file matches the expected value.
*/ */
@Test @Test
public void checkLength() throws IOException { public void checkLength() {
File[] files = tempFile.isDirectory() ? tempFile.listFiles() : new File[] { tempFile }; File[] files = this.tempFile.isDirectory() ? this.tempFile.listFiles() : new File[] { this.tempFile };
Arrays.sort(files); Arrays.sort(files);
for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
long lastBlockLength = rawDataSizes[fileIndex] % (blockSize - info.getInfoSize()); long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize());
long expectedLength = long expectedLength =
(getExpectedBlockCount(fileIndex) - 1) * blockSize + info.getInfoSize() + lastBlockLength; (this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + this.info.getInfoSize() +
lastBlockLength;
Assert.assertEquals(expectedLength, files[fileIndex].length()); Assert.assertEquals(expectedLength, files[fileIndex].length());
} }
} }
protected SequentialInputFormat createInputFormat() throws IOException { protected SequentialInputFormat<PactRecord> createInputFormat() {
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, "file://" + tempFile.getAbsolutePath()); configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, "file://" + this.tempFile.getAbsolutePath());
configuration.setLong(SequentialInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
final SequentialInputFormat inputFormat = new SequentialInputFormat(); final SequentialInputFormat<PactRecord> inputFormat = new SequentialInputFormat<PactRecord>();
inputFormat.configure(configuration); inputFormat.configure(configuration);
return inputFormat; return inputFormat;
} }
...@@ -249,7 +253,8 @@ public class SequentialFormatTest { ...@@ -249,7 +253,8 @@ public class SequentialFormatTest {
} }
private int getExpectedBlockCount(int fileIndex) { private int getExpectedBlockCount(int fileIndex) {
int expectedBlockCount = (int) Math.ceil((double) rawDataSizes[fileIndex] / (blockSize - info.getInfoSize())); int expectedBlockCount =
(int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize - this.info.getInfoSize()));
return expectedBlockCount; return expectedBlockCount;
} }
...@@ -258,7 +263,7 @@ public class SequentialFormatTest { ...@@ -258,7 +263,7 @@ public class SequentialFormatTest {
ArrayList<Object[]> params = new ArrayList<Object[]>(); ArrayList<Object[]> params = new ArrayList<Object[]>();
for (int dop = 1; dop <= 2; dop++) { for (int dop = 1; dop <= 2; dop++) {
// numberOfTuples, blockSize, dop // numberOfTuples, blockSize, dop
params.add(new Object[] { 100, SequentialOutputFormat.NATIVE_BLOCK_SIZE, dop }); params.add(new Object[] { 100, BinaryOutputFormat.NATIVE_BLOCK_SIZE, dop });
params.add(new Object[] { 100, 1000, dop }); params.add(new Object[] { 100, 1000, dop });
params.add(new Object[] { 100, 1 << 20, dop }); params.add(new Object[] { 100, 1 << 20, dop });
params.add(new Object[] { 10000, 1000, dop }); params.add(new Object[] { 10000, 1000, dop });
...@@ -286,7 +291,7 @@ public class SequentialFormatTest { ...@@ -286,7 +291,7 @@ public class SequentialFormatTest {
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
length++; this.length++;
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册