From 942a62311d3334b199eccaa47cdaca62c1bfc6ac Mon Sep 17 00:00:00 2001 From: arvid Date: Thu, 30 May 2013 21:29:22 +0200 Subject: [PATCH] Generified binary and sequential i/o formats --- .../io/BinaryInputFormat.java | 39 +++---- .../io/BinaryOutputFormat.java | 15 +-- .../{common => generic}/io/BlockInfo.java | 2 +- .../{common => generic}/io/FormatUtil.java | 28 ++--- .../io/SequentialInputFormat.java | 10 +- .../io/SequentialOutputFormat.java | 10 +- .../io/SequentialFormatTest.java | 105 +++++++++--------- 7 files changed, 108 insertions(+), 101 deletions(-) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/BinaryInputFormat.java (94%) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/BinaryOutputFormat.java (93%) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/BlockInfo.java (98%) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/FormatUtil.java (86%) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/SequentialInputFormat.java (77%) rename pact/pact-common/src/main/java/eu/stratosphere/pact/{common => generic}/io/SequentialOutputFormat.java (77%) rename pact/pact-common/src/test/java/eu/stratosphere/pact/{common => generic}/io/SequentialFormatTest.java (67%) diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryInputFormat.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryInputFormat.java similarity index 94% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryInputFormat.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryInputFormat.java index 47263a82285..9d6c6ef7f65 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryInputFormat.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryInputFormat.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataInput; import java.io.DataInputStream; @@ -32,9 +32,9 @@ import eu.stratosphere.nephele.fs.FileInputSplit; import eu.stratosphere.nephele.fs.FileStatus; import eu.stratosphere.nephele.fs.FileSystem; import eu.stratosphere.nephele.fs.Path; +import eu.stratosphere.nephele.types.Record; import eu.stratosphere.nephele.util.StringUtils; import eu.stratosphere.pact.common.io.statistics.BaseStatistics; -import eu.stratosphere.pact.common.type.PactRecord; /** * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without @@ -42,7 +42,7 @@ import eu.stratosphere.pact.common.type.PactRecord; * * @author Arvid Heise */ -public abstract class BinaryInputFormat extends FileInputFormat { +public abstract class BinaryInputFormat extends FileInputFormat { /** * The log. @@ -152,29 +152,28 @@ public abstract class BinaryInputFormat extends FileInputFormat { */ @Override public SequentialStatistics getStatistics(BaseStatistics cachedStats) { - + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? - (FileBaseStatistics) cachedStats : null; - + (FileBaseStatistics) cachedStats : null; + try { final Path filePath = this.filePath; - + // get the filesystem final FileSystem fs = FileSystem.get(filePath.toUri()); final ArrayList allFiles = new ArrayList(1); - + // let the file input format deal with the up-to-date check and the basic size final FileBaseStatistics stats = getFileStats(cachedFileStats, filePath, fs, allFiles); if (stats == null) { return null; } - + // check whether the file stats are still sequential stats (in that case they are still valid) if (stats instanceof SequentialStatistics) { return (SequentialStatistics) stats; - } else { - return createStatistics(allFiles, stats); } + return createStatistics(allFiles, stats); } catch (IOException ioex) { if (LOG.isWarnEnabled()) LOG.warn(String.format("Could not determine complete statistics for file '%s' due to an I/O error: %s", @@ -204,7 +203,8 @@ public abstract class BinaryInputFormat extends FileInputFormat { * @param stats * The pre-filled statistics. */ - protected SequentialStatistics createStatistics(List files, FileBaseStatistics stats) throws IOException { + protected SequentialStatistics createStatistics(List files, FileBaseStatistics stats) + throws IOException { if (files.isEmpty()) return null; @@ -225,11 +225,12 @@ public abstract class BinaryInputFormat extends FileInputFormat { } final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount); - return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth, totalCount); + return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth, + totalCount); } private static class SequentialStatistics extends FileBaseStatistics { - + private final long numberOfRecords; public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord, long numberOfRecords) { @@ -285,7 +286,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { * @see eu.stratosphere.pact.common.io.InputFormat#nextRecord(eu.stratosphere.pact.common.type.PactRecord) */ @Override - public boolean nextRecord(PactRecord record) throws IOException { + public boolean nextRecord(T record) throws IOException { if (this.reachedEnd()) return false; this.deserialize(record, this.dataInputStream); @@ -293,7 +294,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { return true; } - protected abstract void deserialize(PactRecord record, DataInput dataInput) throws IOException; + protected abstract void deserialize(T record, DataInput dataInput) throws IOException; /** * Writes a block info at the end of the blocks.
@@ -320,12 +321,12 @@ public abstract class BinaryInputFormat extends FileInputFormat { public int read() throws IOException { if (this.blockPos++ >= this.maxPayloadSize) this.skipHeader(); - return in.read(); + return this.in.read(); } private void skipHeader() throws IOException { byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()]; - in.read(dummy, 0, dummy.length); + this.in.read(dummy, 0, dummy.length); this.blockPos = 0; } @@ -347,7 +348,7 @@ public abstract class BinaryInputFormat extends FileInputFormat { int totalRead = 0; for (int remainingLength = len, offset = off; remainingLength > 0;) { int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos); - int read = in.read(b, offset, blockLen); + int read = this.in.read(b, offset, blockLen); if (read < 0) return read; totalRead += read; diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryOutputFormat.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryOutputFormat.java similarity index 93% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryOutputFormat.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryOutputFormat.java index a854921bb49..12b8fb9b8d7 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BinaryOutputFormat.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BinaryOutputFormat.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataOutput; import java.io.DataOutputStream; @@ -21,12 +21,12 @@ import java.io.IOException; import java.io.OutputStream; import eu.stratosphere.nephele.configuration.Configuration; -import eu.stratosphere.pact.common.type.PactRecord; +import eu.stratosphere.nephele.types.Record; /** * @author Arvid Heise */ -public abstract class BinaryOutputFormat extends FileOutputFormat { +public abstract class BinaryOutputFormat extends FileOutputFormat { /** * The config parameter which defines the fixed length of a record. */ @@ -52,7 +52,8 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { this.dataOutputStream.close(); super.close(); } - + + @SuppressWarnings("unused") protected void complementBlockInfo(BlockInfo blockInfo) throws IOException { } @@ -92,14 +93,14 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { this.dataOutputStream = new DataOutputStream(this.blockBasedInput); } - protected abstract void serialize(PactRecord record, DataOutput dataOutput) throws IOException; + protected abstract void serialize(T record, DataOutput dataOutput) throws IOException; /* * (non-Javadoc) * @see eu.stratosphere.pact.common.io.OutputFormat#writeRecord(eu.stratosphere.pact.common.type.PactRecord) */ @Override - public void writeRecord(PactRecord record) throws IOException { + public void writeRecord(T record) throws IOException { this.blockBasedInput.startRecord(); this.serialize(record, this.dataOutputStream); } @@ -172,7 +173,7 @@ public abstract class BinaryOutputFormat extends FileOutputFormat { for (int remainingLength = len, offset = off; remainingLength > 0;) { int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos); - out.write(b, offset, blockLen); + this.out.write(b, offset, blockLen); this.blockPos += blockLen; if (this.blockPos >= this.maxPayloadSize) diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BlockInfo.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BlockInfo.java similarity index 98% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BlockInfo.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BlockInfo.java index 01b44d8d531..75f68a414ef 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/BlockInfo.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/BlockInfo.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataInput; import java.io.DataOutput; diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/FormatUtil.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/FormatUtil.java similarity index 86% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/FormatUtil.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/FormatUtil.java index e78af6be76f..2352fda8bd6 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/FormatUtil.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/FormatUtil.java @@ -13,7 +13,7 @@ * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.IOException; import java.net.URI; @@ -25,9 +25,8 @@ import eu.stratosphere.nephele.fs.FileInputSplit; import eu.stratosphere.nephele.fs.FileStatus; import eu.stratosphere.nephele.fs.FileSystem; import eu.stratosphere.nephele.fs.Path; +import eu.stratosphere.nephele.types.Record; import eu.stratosphere.pact.common.util.ReflectionUtil; -import eu.stratosphere.pact.generic.io.InputFormat; -import eu.stratosphere.pact.generic.io.OutputFormat; /** * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}. @@ -36,6 +35,7 @@ import eu.stratosphere.pact.generic.io.OutputFormat; */ public class FormatUtil { + /** * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration} * initializes the format. @@ -52,12 +52,12 @@ public class FormatUtil { * @throws IOException * if an I/O error occurred while accessing the file or initializing the InputFormat. */ - public static T openInput( - Class inputFormatClass, String path, Configuration configuration) throws IOException { + public static > F openInput( + Class inputFormatClass, String path, Configuration configuration) throws IOException { configuration = configuration == null ? new Configuration() : configuration; Path normalizedPath = normalizePath(new Path(path)); - final T inputFormat = ReflectionUtil.newInstance(inputFormatClass); + final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, path); configuration.setLong(FileInputFormat.INPUT_STREAM_OPEN_TIMEOUT_KEY, 0); @@ -88,15 +88,15 @@ public class FormatUtil { * if an I/O error occurred while accessing the files or initializing the InputFormat. */ @SuppressWarnings("unchecked") - public static T[] openAllInputs( - Class inputFormatClass, String path, Configuration configuration) throws IOException { + public static > F[] openAllInputs( + Class inputFormatClass, String path, Configuration configuration) throws IOException { Path nephelePath = new Path(path); FileSystem fs = nephelePath.getFileSystem(); FileStatus fileStatus = fs.getFileStatus(nephelePath); if (!fileStatus.isDir()) - return (T[]) new FileInputFormat[] { openInput(inputFormatClass, path, configuration) }; + return (F[]) new FileInputFormat[] { openInput(inputFormatClass, path, configuration) }; FileStatus[] list = fs.listStatus(nephelePath); - T[] formats = (T[]) new FileInputFormat[list.length]; + F[] formats = (F[]) new FileInputFormat[list.length]; for (int index = 0; index < formats.length; index++) formats[index] = openInput(inputFormatClass, list[index].getPath().toString(), configuration); return formats; @@ -118,13 +118,13 @@ public class FormatUtil { * @throws IOException * if an I/O error occurred while accessing the file or initializing the OutputFormat. */ - public static T openOutput( - Class outputFormatClass, String path, Configuration configuration) throws IOException { - final T outputFormat = ReflectionUtil.newInstance(outputFormatClass); + public static > F openOutput( + Class outputFormatClass, String pathString, Configuration configuration) throws IOException { + final F outputFormat = ReflectionUtil.newInstance(outputFormatClass); configuration = configuration == null ? new Configuration() : configuration; - configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, path); + configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, pathString); configuration.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0); outputFormat.configure(configuration); outputFormat.open(1); diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialInputFormat.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialInputFormat.java similarity index 77% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialInputFormat.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialInputFormat.java index 1861e967882..ca2fdc661b1 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialInputFormat.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialInputFormat.java @@ -12,20 +12,20 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataInput; import java.io.IOException; -import eu.stratosphere.pact.common.type.PactRecord; +import eu.stratosphere.nephele.types.Record; /** - * Reads the {@link PactRecord}s from the native format which is deserializable without configuration. + * Reads the {@link Record}s from the native format which is deserializable without configuration. * * @author Arvid Heise * @see SequentialOutputFormat */ -public class SequentialInputFormat extends BinaryInputFormat { +public class SequentialInputFormat extends BinaryInputFormat { /* * (non-Javadoc) * @see @@ -33,7 +33,7 @@ public class SequentialInputFormat extends BinaryInputFormat { * , java.io.DataInput) */ @Override - protected void deserialize(PactRecord record, DataInput dataInput) throws IOException { + protected void deserialize(Record record, DataInput dataInput) throws IOException { record.read(dataInput); } } \ No newline at end of file diff --git a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialOutputFormat.java b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialOutputFormat.java similarity index 77% rename from pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialOutputFormat.java rename to pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialOutputFormat.java index 93811656276..1d9533ee24b 100644 --- a/pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/SequentialOutputFormat.java +++ b/pact/pact-common/src/main/java/eu/stratosphere/pact/generic/io/SequentialOutputFormat.java @@ -12,21 +12,21 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataOutput; import java.io.IOException; -import eu.stratosphere.pact.common.type.PactRecord; +import eu.stratosphere.nephele.types.Record; /** - * Stores complete {@link PactRecord}s in an efficient binary format which is deserializable without configuration. + * Stores complete {@link Record}s in an efficient binary format which is deserializable without configuration. * * @author Arvid Heise * @see BlockBasedOutputFormat * @see SequentialInputFormat */ -public class SequentialOutputFormat extends BinaryOutputFormat { +public class SequentialOutputFormat extends BinaryOutputFormat { /* * (non-Javadoc) * @see @@ -34,7 +34,7 @@ public class SequentialOutputFormat extends BinaryOutputFormat { * , java.io.DataOutput) */ @Override - protected void serialize(PactRecord record, DataOutput dataOutputStream) throws IOException { + protected void serialize(Record record, DataOutput dataOutputStream) throws IOException { record.write(dataOutputStream); } } \ No newline at end of file diff --git a/pact/pact-common/src/test/java/eu/stratosphere/pact/common/io/SequentialFormatTest.java b/pact/pact-common/src/test/java/eu/stratosphere/pact/generic/io/SequentialFormatTest.java similarity index 67% rename from pact/pact-common/src/test/java/eu/stratosphere/pact/common/io/SequentialFormatTest.java rename to pact/pact-common/src/test/java/eu/stratosphere/pact/generic/io/SequentialFormatTest.java index 14d5c598a86..fc400b103c7 100644 --- a/pact/pact-common/src/test/java/eu/stratosphere/pact/common/io/SequentialFormatTest.java +++ b/pact/pact-common/src/test/java/eu/stratosphere/pact/generic/io/SequentialFormatTest.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.common.io; +package eu.stratosphere.pact.generic.io; import java.io.DataOutputStream; import java.io.File; @@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.fs.FileInputSplit; import eu.stratosphere.nephele.fs.Path; +import eu.stratosphere.nephele.types.Record; import eu.stratosphere.pact.common.io.statistics.BaseStatistics; import eu.stratosphere.pact.common.type.PactRecord; import eu.stratosphere.pact.common.type.base.PactInteger; @@ -67,7 +68,7 @@ public class SequentialFormatTest { private int degreeOfParallelism; - private BlockInfo info = new SequentialInputFormat().createBlockInfo(); + private BlockInfo info = new SequentialInputFormat().createBlockInfo(); private int[] rawDataSizes; @@ -89,12 +90,12 @@ public class SequentialFormatTest { @Before public void calcRawDataSize() throws IOException { int recordIndex = 0; - for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { + for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) { ByteCounter byteCounter = new ByteCounter(); DataOutputStream out = new DataOutputStream(byteCounter); - for (int fileCount = 0; fileCount < getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) - getRecord(recordIndex).write(out); - rawDataSizes[fileIndex] = byteCounter.getLength(); + for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) + this.getRecord(recordIndex).write(out); + this.rawDataSizes[fileIndex] = byteCounter.getLength(); } } @@ -103,26 +104,27 @@ public class SequentialFormatTest { */ @Test public void checkInputSplits() throws IOException { - FileInputSplit[] inputSplits = createInputFormat().createInputSplits(0); + FileInputSplit[] inputSplits = this.createInputFormat().createInputSplits(0); Arrays.sort(inputSplits, new InputSplitSorter()); int splitIndex = 0; - for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { + for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) { List sameFileSplits = new ArrayList(); Path lastPath = inputSplits[splitIndex].getPath(); - for (; splitIndex < inputSplits.length; splitIndex++) + for (; splitIndex < inputSplits.length; splitIndex++) { if (!inputSplits[splitIndex].getPath().equals(lastPath)) break; - else - sameFileSplits.add(inputSplits[splitIndex]); + sameFileSplits.add(inputSplits[splitIndex]); + } - Assert.assertEquals(getExpectedBlockCount(fileIndex), sameFileSplits.size()); + Assert.assertEquals(this.getExpectedBlockCount(fileIndex), sameFileSplits.size()); - long lastBlockLength = rawDataSizes[fileIndex] % (blockSize - info.getInfoSize()) + info.getInfoSize(); + long lastBlockLength = + this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize()) + this.info.getInfoSize(); for (int index = 0; index < sameFileSplits.size(); index++) { - Assert.assertEquals(blockSize * index, sameFileSplits.get(index).getStart()); + Assert.assertEquals(this.blockSize * index, sameFileSplits.get(index).getStart()); if (index < sameFileSplits.size() - 1) - Assert.assertEquals(blockSize, sameFileSplits.get(index).getLength()); + Assert.assertEquals(this.blockSize, sameFileSplits.get(index).getLength()); } Assert.assertEquals(lastBlockLength, sameFileSplits.get(sameFileSplits.size() - 1).getLength()); } @@ -133,7 +135,7 @@ public class SequentialFormatTest { */ @Test public void checkRead() throws IOException { - SequentialInputFormat input = createInputFormat(); + SequentialInputFormat input = this.createInputFormat(); FileInputSplit[] inputSplits = input.createInputSplits(0); Arrays.sort(inputSplits, new InputSplitSorter()); int readCount = 0; @@ -142,32 +144,32 @@ public class SequentialFormatTest { PactRecord record = new PactRecord(); while (!input.reachedEnd()) if (input.nextRecord(record)) { - checkEquals(getRecord(readCount), record); + this.checkEquals(this.getRecord(readCount), record); readCount++; } } - Assert.assertEquals(numberOfTuples, readCount); + Assert.assertEquals(this.numberOfTuples, readCount); } /** * Tests the statistics of the given format. */ @Test - public void checkStatistics() throws IOException { - SequentialInputFormat input = createInputFormat(); + public void checkStatistics() { + SequentialInputFormat input = this.createInputFormat(); BaseStatistics statistics = input.getStatistics(null); - Assert.assertEquals(numberOfTuples, statistics.getNumberOfRecords()); + Assert.assertEquals(this.numberOfTuples, statistics.getNumberOfRecords()); } @After public void cleanup() { - deleteRecursively(tempFile); + this.deleteRecursively(this.tempFile); } private void deleteRecursively(File file) { if (file.isDirectory()) for (File subFile : file.listFiles()) - deleteRecursively(subFile); + this.deleteRecursively(subFile); else file.delete(); } @@ -177,57 +179,59 @@ public class SequentialFormatTest { */ @Before public void writeTuples() throws IOException { - tempFile = File.createTempFile("SequentialInputFormat", null); - tempFile.deleteOnExit(); + this.tempFile = File.createTempFile("SequentialInputFormat", null); + this.tempFile.deleteOnExit(); Configuration configuration = new Configuration(); - configuration.setLong(SequentialOutputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); - if (degreeOfParallelism == 1) { + configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); + if (this.degreeOfParallelism == 1) { SequentialOutputFormat output = - FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + tempFile.getAbsolutePath(), + FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + this.tempFile.getAbsolutePath(), configuration); - for (int index = 0; index < numberOfTuples; index++) - output.writeRecord(getRecord(index)); + for (int index = 0; index < this.numberOfTuples; index++) + output.writeRecord(this.getRecord(index)); output.close(); } else { - tempFile.delete(); - tempFile.mkdir(); + this.tempFile.delete(); + this.tempFile.mkdir(); int recordIndex = 0; - for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { + for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) { SequentialOutputFormat output = - FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + tempFile.getAbsolutePath() + "/" + FormatUtil.openOutput(SequentialOutputFormat.class, "file://" + this.tempFile.getAbsolutePath() + + "/" + (fileIndex + 1), configuration); - for (int fileCount = 0; fileCount < getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) - output.writeRecord(getRecord(recordIndex)); + for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) + output.writeRecord(this.getRecord(recordIndex)); output.close(); } } } - private int getNumberOfTuplesPerFile(int fileIndex) { - return numberOfTuples / degreeOfParallelism; + private int getNumberOfTuplesPerFile(@SuppressWarnings("unused") int fileIndex) { + return this.numberOfTuples / this.degreeOfParallelism; } /** * Tests if the length of the file matches the expected value. */ @Test - public void checkLength() throws IOException { - File[] files = tempFile.isDirectory() ? tempFile.listFiles() : new File[] { tempFile }; + public void checkLength() { + File[] files = this.tempFile.isDirectory() ? this.tempFile.listFiles() : new File[] { this.tempFile }; Arrays.sort(files); - for (int fileIndex = 0; fileIndex < degreeOfParallelism; fileIndex++) { - long lastBlockLength = rawDataSizes[fileIndex] % (blockSize - info.getInfoSize()); + for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) { + long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize()); long expectedLength = - (getExpectedBlockCount(fileIndex) - 1) * blockSize + info.getInfoSize() + lastBlockLength; + (this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + this.info.getInfoSize() + + lastBlockLength; Assert.assertEquals(expectedLength, files[fileIndex].length()); } } - protected SequentialInputFormat createInputFormat() throws IOException { + protected SequentialInputFormat createInputFormat() { Configuration configuration = new Configuration(); - configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, "file://" + tempFile.getAbsolutePath()); - configuration.setLong(SequentialInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); + configuration.setString(FileInputFormat.FILE_PARAMETER_KEY, "file://" + this.tempFile.getAbsolutePath()); + configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); - final SequentialInputFormat inputFormat = new SequentialInputFormat(); + final SequentialInputFormat inputFormat = new SequentialInputFormat(); inputFormat.configure(configuration); return inputFormat; } @@ -249,7 +253,8 @@ public class SequentialFormatTest { } private int getExpectedBlockCount(int fileIndex) { - int expectedBlockCount = (int) Math.ceil((double) rawDataSizes[fileIndex] / (blockSize - info.getInfoSize())); + int expectedBlockCount = + (int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize - this.info.getInfoSize())); return expectedBlockCount; } @@ -258,7 +263,7 @@ public class SequentialFormatTest { ArrayList params = new ArrayList(); for (int dop = 1; dop <= 2; dop++) { // numberOfTuples, blockSize, dop - params.add(new Object[] { 100, SequentialOutputFormat.NATIVE_BLOCK_SIZE, dop }); + params.add(new Object[] { 100, BinaryOutputFormat.NATIVE_BLOCK_SIZE, dop }); params.add(new Object[] { 100, 1000, dop }); params.add(new Object[] { 100, 1 << 20, dop }); params.add(new Object[] { 10000, 1000, dop }); @@ -286,7 +291,7 @@ public class SequentialFormatTest { @Override public void write(int b) throws IOException { - length++; + this.length++; } } } -- GitLab