提交 98e659e5 编写于 作者: T Till Rohrmann 提交者: Stephan Ewen

[FLINK-994] Replaced DataInput and DataOutput with DataInputView and DataOutputView

上级 3f511953
......@@ -14,10 +14,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.avro;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
......@@ -50,7 +50,7 @@ public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroB
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
// the null flag
if (datum() == null) {
out.writeBoolean(false);
......@@ -64,7 +64,7 @@ public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroB
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
// the null flag
if (in.readBoolean()) {
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.hadoop.io.Writable;
import eu.stratosphere.types.Value;
......@@ -42,13 +42,13 @@ public class WritableWrapper<T extends Writable> implements Value {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeUTF(wrappedType);
wrapped.write(out);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
if(cl == null) {
cl = Thread.currentThread().getContextClassLoader();
}
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.hadoopcompatibility.mapred.wrapper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.mapred.JobConf;
......@@ -48,14 +48,14 @@ public class HadoopInputSplit implements InputSplit {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(splitNumber);
out.writeUTF(hadoopInputSplitTypeName);
hadoopInputSplit.write(out);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.splitNumber=in.readInt();
this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) {
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.hadoopcompatibility.mapreduce.wrapper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.mapreduce.JobContext;
......@@ -50,7 +50,7 @@ public class HadoopInputSplit implements InputSplit {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.splitNumber);
out.writeUTF(this.mapreduceInputSplit.getClass().getName());
Writable w = (Writable) this.mapreduceInputSplit;
......@@ -58,7 +58,7 @@ public class HadoopInputSplit implements InputSplit {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.splitNumber=in.readInt();
String className = in.readUTF();
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.api.common.accumulators;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
public class DoubleCounter implements SimpleAccumulator<Double> {
......@@ -44,12 +45,12 @@ public class DoubleCounter implements SimpleAccumulator<Double> {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeDouble(localValue);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.localValue = in.readDouble();
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.api.common.accumulators;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import com.google.common.collect.Maps;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* Histogram for discrete-data. Let's you populate a histogram distributedly.
......@@ -73,7 +73,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(hashMap.size());
for (Map.Entry<Integer, Integer> entry : hashMap.entrySet()) {
out.writeInt(entry.getKey());
......@@ -82,7 +82,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; ++i) {
hashMap.put(in.readInt(), in.readInt());
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.api.common.accumulators;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
public class IntCounter implements SimpleAccumulator<Integer> {
......@@ -44,12 +45,12 @@ public class IntCounter implements SimpleAccumulator<Integer> {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(localValue);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
localValue = in.readInt();
}
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.api.common.accumulators;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
......@@ -46,12 +47,12 @@ public class LongCounter implements SimpleAccumulator<Long> {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeLong(this.localValue);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.localValue = in.readLong();
}
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.distributions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.Key;
import eu.stratosphere.util.InstantiationUtil;
......@@ -123,7 +123,7 @@ public class SimpleDistribution implements DataDistribution {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.dim);
out.writeInt(boundaries.length);
......@@ -141,7 +141,7 @@ public class SimpleDistribution implements DataDistribution {
@SuppressWarnings("unchecked")
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.dim = in.readInt();
final int len = in.readInt();
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.distributions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.IntValue;
public class SimpleIntegerDistribution extends SimpleDistribution {
......@@ -113,7 +113,7 @@ public class SimpleIntegerDistribution extends SimpleDistribution {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.dim);
out.writeInt(boundaries.length);
......@@ -125,7 +125,7 @@ public class SimpleIntegerDistribution extends SimpleDistribution {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.dim = in.readInt();
final int len = in.readInt();
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.distributions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.DoubleValue;
......@@ -45,13 +45,13 @@ public class UniformDoubleDistribution implements DataDistribution {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeDouble(min);
out.writeDouble(max);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
min = in.readDouble();
max = in.readDouble();
}
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.distributions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.IntValue;
......@@ -46,13 +46,13 @@ public class UniformIntegerDistribution implements DataDistribution {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(min);
out.writeInt(max);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
min = in.readInt();
max = in.readInt();
}
......
......@@ -12,7 +12,6 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
......@@ -20,6 +19,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -213,7 +214,7 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable> extends Fi
fdis.seek(file.getLen() - blockInfo.getInfoSize());
DataInputStream input = new DataInputStream(fdis);
blockInfo.read(input);
blockInfo.read(new InputViewDataInputStreamWrapper(input));
totalCount += blockInfo.getAccumulatedRecordCount();
}
......@@ -249,7 +250,7 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable> extends Fi
// TODO: seek not supported by compressed streams. Will throw exception
this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
DataInputStream infoStream = new DataInputStream(this.stream);
this.blockInfo.read(infoStream);
this.blockInfo.read(new InputViewDataInputStreamWrapper(infoStream));
}
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
......@@ -269,12 +270,12 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable> extends Fi
return null;
}
record = this.deserialize(record, this.dataInputStream);
record = this.deserialize(record, new InputViewDataInputStreamWrapper(this.dataInputStream));
this.readRecords++;
return record;
}
protected abstract T deserialize(T reuse, DataInput dataInput) throws IOException;
protected abstract T deserialize(T reuse, DataInputView dataInput) throws IOException;
/**
* Writes a block info at the end of the blocks.<br>
......
......@@ -12,7 +12,6 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
......@@ -20,6 +19,8 @@ import java.io.OutputStream;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends FileOutputFormat<T> {
......@@ -79,12 +80,12 @@ public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends F
this.dataOutputStream = new DataOutputStream(this.blockBasedInput);
}
protected abstract void serialize(T record, DataOutput dataOutput) throws IOException;
protected abstract void serialize(T record, DataOutputView dataOutput) throws IOException;
@Override
public void writeRecord(T record) throws IOException {
this.blockBasedInput.startRecord();
this.serialize(record, this.dataOutputStream);
this.serialize(record, new OutputViewDataOutputStreamWrapper(this.dataOutputStream));
}
/**
......@@ -165,7 +166,7 @@ public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends F
this.blockInfo.setAccumulatedRecordCount(this.totalCount);
this.blockInfo.setFirstRecordStart(this.firstRecordStartPos == NO_RECORD ? 0 : this.firstRecordStartPos);
BinaryOutputFormat.this.complementBlockInfo(this.blockInfo);
this.blockInfo.write(this.headerStream);
this.blockInfo.write(new OutputViewDataOutputStreamWrapper(this.headerStream));
this.blockPos = 0;
this.blockCount = 0;
this.firstRecordStartPos = NO_RECORD;
......
......@@ -12,11 +12,11 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
public class BlockInfo implements IOReadableWritable {
......@@ -50,14 +50,14 @@ public class BlockInfo implements IOReadableWritable {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeLong(this.recordCount);
out.writeLong(this.accumulatedRecordCount);
out.writeLong(this.firstRecordStart);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.recordCount = in.readLong();
this.accumulatedRecordCount = in.readLong();
this.firstRecordStart = in.readLong();
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.DataInput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
/**
* Reads elements by deserializing them with their regular serialization/deserialization functionality.
......@@ -27,7 +27,7 @@ public class SerializedInputFormat<T extends IOReadableWritable> extends BinaryI
private static final long serialVersionUID = 1L;
@Override
protected T deserialize(T reuse, DataInput dataInput) throws IOException {
protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
reuse.read(dataInput);
return reuse;
}
......
......@@ -12,10 +12,10 @@
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataOutputView;
/**
* Stores elements by serializing them with their regular serialization/deserialization functionality.
......@@ -27,7 +27,7 @@ public class SerializedOutputFormat extends BinaryOutputFormat<IOReadableWritabl
private static final long serialVersionUID = 1L;
@Override
protected void serialize(IOReadableWritable record, DataOutput dataOutputStream) throws IOException {
record.write(dataOutputStream);
protected void serialize(IOReadableWritable record, DataOutputView dataOutputView) throws IOException {
record.write(dataOutputView);
}
}
......@@ -13,8 +13,6 @@
package eu.stratosphere.configuration;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
......@@ -22,6 +20,8 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.commons.codec.binary.Base64;
import eu.stratosphere.core.io.IOReadableWritable;
......@@ -449,7 +449,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
// --------------------------------------------------------------------------------------------
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
synchronized (this.confData) {
......@@ -465,7 +465,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
synchronized (this.confData) {
......
......@@ -19,12 +19,12 @@
package eu.stratosphere.core.fs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* A file input split provides information on a particular part of a file, possibly
......@@ -143,7 +143,7 @@ public class FileInputSplit implements InputSplit {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// write partition number
out.writeInt(this.partitionNumber);
......@@ -173,7 +173,7 @@ public class FileInputSplit implements InputSplit {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// read partition number
this.partitionNumber = in.readInt();
......
......@@ -19,8 +19,6 @@
package eu.stratosphere.core.fs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
......@@ -29,6 +27,8 @@ import java.net.URISyntaxException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.OperatingSystem;
import eu.stratosphere.util.StringUtils;
......@@ -451,7 +451,7 @@ public class Path implements IOReadableWritable, Serializable {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
final boolean isNotNull = in.readBoolean();
if (isNotNull) {
......@@ -474,7 +474,7 @@ public class Path implements IOReadableWritable, Serializable {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
if (uri == null) {
out.writeBoolean(false);
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.core.io;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
/**
......@@ -53,13 +54,13 @@ public class GenericInputSplit implements InputSplit {
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.partitionNumber);
out.writeInt(this.totalNumberOfPartitions);
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.partitionNumber = in.readInt();
this.totalNumberOfPartitions = in.readInt();
}
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.core.io;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
/**
......@@ -27,23 +28,23 @@ import java.io.IOException;
public interface IOReadableWritable {
/**
* Writes the object's internal data to the given data output stream.
* Writes the object's internal data to the given data output view.
*
* @param out
* the output stream to receive the data.
* the output view to receive the data.
* @throws IOException
* thrown if any error occurs while writing to the output stream
*/
void write(DataOutput out) throws IOException;
void write(DataOutputView out) throws IOException;
/**
* Reads the object's internal data from the given data input stream.
* Reads the object's internal data from the given data input view.
*
* @param in
* the input stream to read the data from
* the input view to read the data from
* @throws IOException
* thrown if any error occurs while reading from the input stream
*/
void read(DataInput in) throws IOException;
void read(DataInputView in) throws IOException;
}
......@@ -13,8 +13,9 @@
package eu.stratosphere.core.io;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
......@@ -68,7 +69,7 @@ public class LocatableInputSplit implements InputSplit {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the split number
out.writeInt(this.splitNumber);
......@@ -87,7 +88,7 @@ public class LocatableInputSplit implements InputSplit {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the split number
this.splitNumber = in.readInt();
......
......@@ -34,6 +34,8 @@ import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.types.Value;
/**
......@@ -287,8 +289,9 @@ public class StringRecord implements Value {
/**
* deserialize
* @param in
*/
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
final int newLength = in.readInt();
setCapacity(newLength, false);
in.readFully(this.bytes, 0, newLength);
......@@ -316,7 +319,7 @@ public class StringRecord implements Value {
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.length);
out.write(this.bytes, 0, this.length);
}
......
......@@ -35,4 +35,26 @@ public interface DataInputView extends DataInput {
* be advanced to the desired position.
*/
public void skipBytesToRead(int numBytes) throws IOException;
/**
* Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
* It returns the number of read bytes or -1 if there is no more data left.
*
* @param b byte array to store the data to
* @param off offset into byte array
* @param len byte length to read
* @return the number of actually read bytes of -1 if there is no more data left
* @throws IOException
*/
public int read(byte[] b, int off, int len) throws IOException;
/**
* Tries to fill the given byte array {@code b}. Returns the actually number of read bytes or -1 if there is no
* more data.
*
* @param b byte array to store the data to
* @return the number of read bytes or -1 if there is no more data left
* @throws IOException
*/
public int read(byte[] b) throws IOException;
}
/***********************************************************************************************************************
*
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
......@@ -10,104 +9,112 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
*/
package eu.stratosphere.core.memory;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
/**
* A utility that presents a {@link DataInput} as a {@link DataInputView}.
*/
public class InputViewDataInputWrapper implements DataInputView {
private DataInput delegate;
public void setDelegate(DataInput delegate) {
this.delegate = delegate;
public class InputViewDataInputStreamWrapper implements DataInputView {
private final DataInputStream in;
public InputViewDataInputStreamWrapper(DataInputStream in){
this.in = in;
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
int result = in.skipBytes(numBytes);
if(result != numBytes){
throw new EOFException("Could not skip " + numBytes + " bytes.");
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public void readFully(byte[] b) throws IOException {
this.delegate.readFully(b);
in.readFully(b);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
this.delegate.readFully(b, off, len);
in.readFully(b, off, len);
}
@Override
public int skipBytes(int n) throws IOException {
return this.delegate.skipBytes(n);
return in.skipBytes(n);
}
@Override
public boolean readBoolean() throws IOException {
return this.delegate.readBoolean();
return in.readBoolean();
}
@Override
public byte readByte() throws IOException {
return this.delegate.readByte();
return in.readByte();
}
@Override
public int readUnsignedByte() throws IOException {
return this.delegate.readUnsignedByte();
return in.readUnsignedByte();
}
@Override
public short readShort() throws IOException {
return this.delegate.readShort();
return in.readShort();
}
@Override
public int readUnsignedShort() throws IOException {
return this.delegate.readUnsignedShort();
return in.readUnsignedShort();
}
@Override
public char readChar() throws IOException {
return this.delegate.readChar();
return in.readChar();
}
@Override
public int readInt() throws IOException {
return this.delegate.readInt();
return in.readInt();
}
@Override
public long readLong() throws IOException {
return this.delegate.readLong();
return in.readLong();
}
@Override
public float readFloat() throws IOException {
return this.delegate.readFloat();
return in.readFloat();
}
@Override
public double readDouble() throws IOException {
return this.delegate.readDouble();
return in.readDouble();
}
@Override
public String readLine() throws IOException {
return this.delegate.readLine();
return in.readLine();
}
@Override
public String readUTF() throws IOException {
return this.delegate.readUTF();
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
for (int i = 0; i < numBytes; i++) {
this.delegate.readByte();
}
return in.readUTF();
}
}
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package eu.stratosphere.core.memory;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
public class InputViewObjectInputStreamWrapper implements DataInputView {
private final ObjectInputStream in;
public InputViewObjectInputStreamWrapper(ObjectInputStream in){
this.in = in;
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
int skippedBytes = in.skipBytes(numBytes);
if(skippedBytes < numBytes){
throw new EOFException("Could not skip " + numBytes + " bytes.");
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public void readFully(byte[] b) throws IOException {
in.readFully(b);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
in.readFully(b, off, len);
}
@Override
public int skipBytes(int n) throws IOException {
return in.skipBytes(n);
}
@Override
public boolean readBoolean() throws IOException {
return in.readBoolean();
}
@Override
public byte readByte() throws IOException {
return in.readByte();
}
@Override
public int readUnsignedByte() throws IOException {
return in.readUnsignedByte();
}
@Override
public short readShort() throws IOException {
return in.readShort();
}
@Override
public int readUnsignedShort() throws IOException {
return in.readUnsignedShort();
}
@Override
public char readChar() throws IOException {
return in.readChar();
}
@Override
public int readInt() throws IOException {
return in.readInt();
}
@Override
public long readLong() throws IOException {
return in.readLong();
}
@Override
public float readFloat() throws IOException {
return in.readFloat();
}
@Override
public double readDouble() throws IOException {
return in.readDouble();
}
@Override
public String readLine() throws IOException {
return in.readLine();
}
@Override
public String readUTF() throws IOException {
return in.readUTF();
}
}
/***********************************************************************************************************************
*
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
......@@ -10,107 +9,102 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
*/
package eu.stratosphere.core.memory;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
/**
* A utility that presents a {@link DataOutput} as a {@link DataOutputView}.
*/
public class OutputViewDataOutputWrapper implements DataOutputView {
private DataOutput delegate;
public void setDelegate(DataOutput delegate) {
this.delegate = delegate;
public class OutputViewDataOutputStreamWrapper implements DataOutputView {
private final DataOutputStream out;
public OutputViewDataOutputStreamWrapper(DataOutputStream out){
this.out = out;
}
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
byte[] bytes = new byte[numBytes];
Arrays.fill(bytes, (byte)0);
out.write(bytes);
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
byte[] buffer = new byte[numBytes];
source.read(buffer);
out.write(buffer);
}
@Override
public void write(int b) throws IOException {
this.delegate.write(b);
out.write(b);
}
@Override
public void write(byte[] b) throws IOException {
this.delegate.write(b);
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.delegate.write(b, off, len);
out.write(b, off, len);
}
@Override
public void writeBoolean(boolean v) throws IOException {
this.delegate.writeBoolean(v);
out.writeBoolean(v);
}
@Override
public void writeByte(int v) throws IOException {
this.delegate.writeByte(v);
out.writeByte(v);
}
@Override
public void writeShort(int v) throws IOException {
this.delegate.writeShort(v);
out.writeShort(v);
}
@Override
public void writeChar(int v) throws IOException {
this.delegate.writeChar(v);
out.writeChar(v);
}
@Override
public void writeInt(int v) throws IOException {
this.delegate.writeInt(v);
out.writeInt(v);
}
@Override
public void writeLong(long v) throws IOException {
this.delegate.writeLong(v);
out.writeLong(v);
}
@Override
public void writeFloat(float v) throws IOException {
this.delegate.writeFloat(v);
out.writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
this.delegate.writeDouble(v);
out.writeDouble(v);
}
@Override
public void writeBytes(String s) throws IOException {
this.delegate.writeBytes(s);
out.writeBytes(s);
}
@Override
public void writeChars(String s) throws IOException {
this.delegate.writeChars(s);
out.writeChars(s);
}
@Override
public void writeUTF(String s) throws IOException {
this.delegate.writeUTF(s);
}
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
// skip by writing zeros.
for (int i = 0; i < numBytes; i++) {
this.delegate.writeByte(0);
}
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
for (int i = 0; i < numBytes; i++) {
this.delegate.writeByte(source.readUnsignedByte());
}
out.writeUTF(s);
}
}
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package eu.stratosphere.core.memory;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Arrays;
public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
private final ObjectOutputStream out;
public OutputViewObjectOutputStreamWrapper(ObjectOutputStream out){
this.out = out;
}
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
byte[] buffer = new byte[numBytes];
Arrays.fill(buffer, (byte) 0);
out.write(buffer);
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
byte[] buffer = new byte[numBytes];
source.readFully(buffer);
out.write(buffer);
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void writeBoolean(boolean v) throws IOException {
out.writeBoolean(v);
}
@Override
public void writeByte(int v) throws IOException {
out.writeByte(v);
}
@Override
public void writeShort(int v) throws IOException {
out.writeShort(v);
}
@Override
public void writeChar(int v) throws IOException {
out.writeChar(v);
}
@Override
public void writeInt(int v) throws IOException {
out.writeInt(v);
}
@Override
public void writeLong(long v) throws IOException {
out.writeLong(v);
}
@Override
public void writeFloat(float v) throws IOException {
out.writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
out.writeDouble(v);
}
@Override
public void writeBytes(String s) throws IOException {
out.writeBytes(s);
}
@Override
public void writeChars(String s) throws IOException {
out.writeChars(s);
}
@Override
public void writeUTF(String s) throws IOException {
out.writeUTF(s);
}
}
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -70,12 +68,12 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeBoolean(this.value);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readBoolean();
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -75,12 +73,12 @@ public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<By
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readByte();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeByte(this.value);
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -80,12 +78,12 @@ public class CharValue implements NormalizableKey<CharValue>, ResettableValue<Ch
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readChar();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeChar(this.value);
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -75,12 +73,12 @@ public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValu
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeDouble(this.value);
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -75,12 +73,12 @@ public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>,
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readFloat();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeFloat(this.value);
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -80,12 +78,12 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.value);
}
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
......@@ -22,6 +20,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.ReflectionUtil;
/**
......@@ -83,7 +83,7 @@ public abstract class ListValue<V extends Value> implements Value, List<V> {
* @see eu.stratosphere.nephele.io.IOReadableWritable#read(java.io.DataInput)
*/
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
int size = in.readInt();
this.list.clear();
......@@ -106,7 +106,7 @@ public abstract class ListValue<V extends Value> implements Value, List<V> {
* @see eu.stratosphere.nephele.io.IOReadableWritable#write(java.io.DataOutput)
*/
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.list.size());
for (final V value : this.list) {
value.write(out);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -88,7 +86,7 @@ public class LongValue implements NormalizableKey<LongValue>, ResettableValue<Lo
* @see eu.stratosphere.nephele.io.IOReadableWritable#read(java.io.DataInput)
*/
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.value = in.readLong();
}
......@@ -97,7 +95,7 @@ public class LongValue implements NormalizableKey<LongValue>, ResettableValue<Lo
* @see eu.stratosphere.nephele.io.IOReadableWritable#write(java.io.DataOutput)
*/
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeLong(this.value);
}
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.ReflectionUtil;
/**
......@@ -73,7 +73,7 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
* @see eu.stratosphere.nephele.io.IOReadableWritable#read(java.io.DataInput)
*/
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
int size = in.readInt();
this.map.clear();
......@@ -97,7 +97,7 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
* @see eu.stratosphere.nephele.io.IOReadableWritable#write(java.io.DataOutput)
*/
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.map.size());
for (final Entry<K, V> entry : this.map.entrySet()) {
entry.getKey().write(out);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -60,12 +58,12 @@ public final class NullValue implements NormalizableKey<NullValue>, CopyableValu
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
in.readBoolean();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeBoolean(false);
}
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.ReflectionUtil;
/**
......@@ -123,13 +123,13 @@ public abstract class Pair<U extends Key<U>, V extends Key<V>> implements Key<Pa
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.first.read(in);
this.second.read(in);
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
this.first.write(out);
this.second.write(out);
}
......
......@@ -1070,7 +1070,7 @@ public final class Record implements Value, CopyableValue<Record> {
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
// make sure everything is in a valid binary representation
updateBinaryRepresenation();
......@@ -1080,7 +1080,7 @@ public final class Record implements Value, CopyableValue<Record> {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
final int len = readVarLengthInt(in);
this.binaryLen = len;
......@@ -1187,7 +1187,7 @@ public final class Record implements Value, CopyableValue<Record> {
}
/**
* Writes this record to the given output view. This method is similar to {@link #write(DataOutput)}, but
* Writes this record to the given output view. This method is similar to {@link eu.stratosphere.core.io.IOReadableWritable#write(eu.stratosphere.core.memory.DataOutputView)}, but
* it returns the number of bytes written.
*
* @param target The view to write the record to.
......@@ -1262,12 +1262,12 @@ public final class Record implements Value, CopyableValue<Record> {
private static final long serialVersionUID = 1L;
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
throw new UnsupportedOperationException();
}
};
......@@ -1275,7 +1275,7 @@ public final class Record implements Value, CopyableValue<Record> {
/**
* Internal interface class to provide serialization for the data types.
*/
private static final class InternalDeSerializer implements DataInput, DataOutput, Serializable {
private static final class InternalDeSerializer implements DataInputView, DataOutputView, Serializable {
private static final long serialVersionUID = 1L;
......@@ -1510,6 +1510,49 @@ public final class Record implements Value, CopyableValue<Record> {
return n;
}
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
if(this.end - this.position < numBytes) {
throw new EOFException("Could not skip " + numBytes + ".");
}
skipBytes(numBytes);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if(b == null){
throw new NullPointerException("Byte array b cannot be null.");
}
if(off < 0){
throw new IndexOutOfBoundsException("Offset cannot be negative.");
}
if(len < 0){
throw new IndexOutOfBoundsException("Length cannot be negative.");
}
if(b.length - off < len){
throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
".");
}
if(this.position >= this.end){
return -1;
}else{
int toRead = Math.min(this.end-this.position, len);
System.arraycopy(this.memory,this.position,b,off,toRead);
this.position += toRead;
return toRead;
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
// ----------------------------------------------------------------------------------------
// Data Output
......@@ -1736,5 +1779,24 @@ public final class Record implements Value, CopyableValue<Record> {
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
int skippedBytes = skipBytes(numBytes);
if(skippedBytes != numBytes){
throw new EOFException("Could not skip " + numBytes + " bytes.");
}
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
if(numBytes > this.end - this.position){
throw new IOException("Could not write " + numBytes + " bytes since the buffer is full.");
}
source.read(this.memory,this.position, numBytes);
this.position += numBytes;
}
}
}
......@@ -13,8 +13,6 @@
package eu.stratosphere.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
......@@ -80,12 +78,12 @@ public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.value = in.readShort();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeShort(this.value);
}
......
......@@ -470,7 +470,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
// --------------------------------------------------------------------------------------------
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
int len = in.readUnsignedByte();
if (len >= HIGH_BIT) {
......@@ -508,7 +508,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
int len = this.len;
// write the length, variable-length encoded
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.api.common.io;
import java.io.DataInput;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.BeforeClass;
......@@ -35,7 +35,7 @@ public class BinaryInputFormatTest {
private static final long serialVersionUID = 1L;
@Override
protected Record deserialize(Record record, DataInput dataInput) {
protected Record deserialize(Record record, DataInputView dataInput) {
return record;
}
}
......
......@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
......@@ -92,7 +93,7 @@ public class SequentialFormatTest {
ByteCounter byteCounter = new ByteCounter();
DataOutputStream out = new DataOutputStream(byteCounter);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
this.getRecord(recordIndex).write(out);
this.getRecord(recordIndex).write(new OutputViewDataOutputStreamWrapper(out));
}
this.rawDataSizes[fileIndex] = byteCounter.getLength();
}
......
......@@ -19,6 +19,8 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import junit.framework.Assert;
import org.junit.Test;
......@@ -107,7 +109,7 @@ public class SimpleDataDistributionTest {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
try {
ddWrite.write(dos);
ddWrite.write(new OutputViewDataOutputStreamWrapper(dos));
} catch (IOException e) {
Assert.fail("Error serializing the DataDistribution: " + e.getMessage());
}
......@@ -120,7 +122,7 @@ public class SimpleDataDistributionTest {
SimpleDistribution ddRead = new SimpleDistribution();
try {
ddRead.read(in);
ddRead.read(new InputViewDataInputStreamWrapper(in));
} catch (Exception ex) {
Assert.fail("The deserialization of the encoded data distribution caused an error");
}
......
......@@ -25,6 +25,8 @@ import java.io.IOException;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
/**
* This class contains auxiliary methods for unit tests in the Nephele common module.
......@@ -95,7 +97,7 @@ public class CommonTestUtils {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
original.write(dos);
original.write(new OutputViewDataOutputStreamWrapper(dos));
final String className = original.getClass().getName();
if (className == null) {
......@@ -130,7 +132,7 @@ public class CommonTestUtils {
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
final DataInputStream dis = new DataInputStream(bais);
copy.read(dis);
copy.read(new InputViewDataInputStreamWrapper(dis));
return copy;
}
......
......@@ -22,6 +22,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map.Entry;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import junit.framework.Assert;
import org.junit.Before;
......@@ -58,8 +60,8 @@ public class CollectionsDataTypeTest {
try {
NfIntStringPair mPairActual = new NfIntStringPair();
pair1.write(out);
mPairActual.read(in);
pair1.write(new OutputViewDataOutputStreamWrapper(out));
mPairActual.read(new InputViewDataInputStreamWrapper(in));
Assert.assertEquals(pair1, mPairActual);
} catch (IOException e) {
......@@ -182,8 +184,8 @@ public class CollectionsDataTypeTest {
// now test data transfer
NfIntStringMap nMap = new NfIntStringMap();
try {
map0.write(out);
nMap.read(in);
map0.write(new OutputViewDataOutputStreamWrapper(out));
nMap.read(new InputViewDataInputStreamWrapper(in));
} catch (Exception e) {
Assert.assertTrue(false);
}
......@@ -210,8 +212,8 @@ public class CollectionsDataTypeTest {
// test data transfer
NfStringList mList2 = new NfStringList();
try {
list.write(out);
mList2.read(in);
list.write(new OutputViewDataOutputStreamWrapper(out));
mList2.read(new InputViewDataInputStreamWrapper(in));
} catch (Exception e) {
Assert.assertTrue(false);
}
......
......@@ -19,6 +19,8 @@ import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import junit.framework.Assert;
import org.junit.Before;
......@@ -56,15 +58,15 @@ public class PrimitiveDataTypeTest {
Assert.assertEquals(int0.compareTo(int3), -1);
// test stream output and retrieval
try {
int0.write(mOut);
int2.write(mOut);
int3.write(mOut);
int0.write(new OutputViewDataOutputStreamWrapper(mOut));
int2.write(new OutputViewDataOutputStreamWrapper(mOut));
int3.write(new OutputViewDataOutputStreamWrapper(mOut));
IntValue int1n = new IntValue();
IntValue int2n = new IntValue();
IntValue int3n = new IntValue();
int1n.read(mIn);
int2n.read(mIn);
int3n.read(mIn);
int1n.read(new InputViewDataInputStreamWrapper(mIn));
int2n.read(new InputViewDataInputStreamWrapper(mIn));
int3n.read(new InputViewDataInputStreamWrapper(mIn));
Assert.assertEquals(int0.compareTo(int1n), 0);
Assert.assertEquals(int0.getValue(), int1n.getValue());
Assert.assertEquals(int2.compareTo(int2n), 0);
......@@ -92,15 +94,15 @@ public class PrimitiveDataTypeTest {
Assert.assertEquals(double0.compareTo(double3), -1);
// test stream output and retrieval
try {
double0.write(mOut);
double2.write(mOut);
double3.write(mOut);
double0.write(new OutputViewDataOutputStreamWrapper(mOut));
double2.write(new OutputViewDataOutputStreamWrapper(mOut));
double3.write(new OutputViewDataOutputStreamWrapper(mOut));
DoubleValue double1n = new DoubleValue();
DoubleValue double2n = new DoubleValue();
DoubleValue double3n = new DoubleValue();
double1n.read(mIn);
double2n.read(mIn);
double3n.read(mIn);
double1n.read(new InputViewDataInputStreamWrapper(mIn));
double2n.read(new InputViewDataInputStreamWrapper(mIn));
double3n.read(new InputViewDataInputStreamWrapper(mIn));
Assert.assertEquals(double0.compareTo(double1n), 0);
Assert.assertEquals(double0.getValue(), double1n.getValue());
Assert.assertEquals(double2.compareTo(double2n), 0);
......@@ -156,21 +158,21 @@ public class PrimitiveDataTypeTest {
// test stream out/input
try {
string0.write(mOut);
string4.write(mOut);
string2.write(mOut);
string3.write(mOut);
string7.write(mOut);
string0.write(new OutputViewDataOutputStreamWrapper(mOut));
string4.write(new OutputViewDataOutputStreamWrapper(mOut));
string2.write(new OutputViewDataOutputStreamWrapper(mOut));
string3.write(new OutputViewDataOutputStreamWrapper(mOut));
string7.write(new OutputViewDataOutputStreamWrapper(mOut));
StringValue string1n = new StringValue();
StringValue string2n = new StringValue();
StringValue string3n = new StringValue();
StringValue string4n = new StringValue();
StringValue string7n = new StringValue();
string1n.read(mIn);
string4n.read(mIn);
string2n.read(mIn);
string3n.read(mIn);
string7n.read(mIn);
string1n.read(new InputViewDataInputStreamWrapper(mIn));
string4n.read(new InputViewDataInputStreamWrapper(mIn));
string2n.read(new InputViewDataInputStreamWrapper(mIn));
string3n.read(new InputViewDataInputStreamWrapper(mIn));
string7n.read(new InputViewDataInputStreamWrapper(mIn));
Assert.assertEquals(string0.compareTo(string1n), 0);
Assert.assertEquals(string0.toString(), string1n.toString());
Assert.assertEquals(string4.compareTo(string4n), 0);
......@@ -209,12 +211,12 @@ public class PrimitiveDataTypeTest {
try {
// write it multiple times
for (int i = 0; i < numWrites; i++) {
pn.write(mOut);
pn.write(new OutputViewDataOutputStreamWrapper(mOut));
}
// read it multiple times
for (int i = 0; i < numWrites; i++) {
pn.read(mIn);
pn.read(new InputViewDataInputStreamWrapper(mIn));
}
Assert.assertEquals("Reading PactNull does not consume the same data as was written.", mIn.available(), 0);
......
......@@ -26,6 +26,8 @@ import java.io.PipedOutputStream;
import java.util.Arrays;
import java.util.Random;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -59,15 +61,15 @@ public class RecordTest {
try {
// test deserialize into self
Record empty = new Record();
empty.write(this.out);
empty.read(this.in);
empty.write(new OutputViewDataOutputStreamWrapper(this.out));
empty.read(new InputViewDataInputStreamWrapper(this.in));
Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
// test deserialize into new
empty = new Record();
empty.write(this.out);
empty.write(new OutputViewDataOutputStreamWrapper(this.out));
empty = new Record();
empty.read(this.in);
empty.read(new InputViewDataInputStreamWrapper(this.in));
Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
} catch (Throwable t) {
......@@ -382,18 +384,18 @@ public class RecordTest {
try {
// serialize and deserialize to remove all buffered info
r.write(out);
r.write(new OutputViewDataOutputStreamWrapper(out));
r = new Record();
r.read(in);
r.read(new InputViewDataInputStreamWrapper(in));
r.setField(1, new IntValue(10));
r.setField(4, new StringValue("Some long value"));
r.setField(5, new StringValue("An even longer value"));
r.setField(10, new IntValue(10));
r.write(out);
r.write(new OutputViewDataOutputStreamWrapper(out));
r = new Record();
r.read(in);
r.read(new InputViewDataInputStreamWrapper(in));
assertTrue(r.getField(0, IntValue.class).getValue() == 0);
assertTrue(r.getField(1, IntValue.class).getValue() == 10);
......@@ -427,8 +429,8 @@ public class RecordTest {
Record record2 = new Record();
try {
// De/Serialize the record
record1.write(this.out);
record2.read(this.in);
record1.write(new OutputViewDataOutputStreamWrapper(this.out));
record2.read(new InputViewDataInputStreamWrapper(this.in));
assertTrue(record1.getNumFields() == record2.getNumFields());
......@@ -456,20 +458,20 @@ public class RecordTest {
try {
Record record = new Record(new IntValue(42));
record.write(out);
record.write(new OutputViewDataOutputStreamWrapper(out));
Assert.assertEquals(42, record.getField(0, IntValue.class).getValue());
record.setField(0, new IntValue(23));
record.write(out);
record.write(new OutputViewDataOutputStreamWrapper(out));
Assert.assertEquals(23, record.getField(0, IntValue.class).getValue());
record.clear();
Assert.assertEquals(0, record.getNumFields());
Record record2 = new Record(new IntValue(42));
record2.read(in);
record2.read(new InputViewDataInputStreamWrapper(in));
Assert.assertEquals(42, record2.getField(0, IntValue.class).getValue());
record2.read(in);
record2.read(new InputViewDataInputStreamWrapper(in));
Assert.assertEquals(23, record2.getField(0, IntValue.class).getValue());
} catch (Throwable t) {
Assert.fail("Test failed due to an exception: " + t.getMessage());
......@@ -541,7 +543,8 @@ public class RecordTest {
}
}
static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInput reader, DataOutput writer)
static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInputStream reader,
DataOutputStream writer)
throws Exception
{
final int[] permutation1 = createPermutation(rnd, values.length);
......@@ -586,9 +589,9 @@ public class RecordTest {
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
rec.write(writer);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec = new Record();
rec.read(reader);
rec.read(new InputViewDataInputStreamWrapper(reader));
testAllRetrievalMethods(rec, permutation2, values);
// test adding and retrieving with full stream serialization and deserialization into the same record
......@@ -597,8 +600,8 @@ public class RecordTest {
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
rec.write(writer);
rec.read(reader);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec.read(new InputViewDataInputStreamWrapper(reader));
testAllRetrievalMethods(rec, permutation2, values);
// test adding and retrieving with partial stream serialization and deserialization into a new record
......@@ -606,18 +609,18 @@ public class RecordTest {
updatePos = rnd.nextInt(values.length + 1);
for (int i = 0; i < values.length; i++) {
if (i == updatePos) {
rec.write(writer);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec = new Record();
rec.read(reader);
rec.read(new InputViewDataInputStreamWrapper(reader));
}
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
if (updatePos == values.length) {
rec.write(writer);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec = new Record();
rec.read(reader);
rec.read(new InputViewDataInputStreamWrapper(reader));
}
testAllRetrievalMethods(rec, permutation2, values);
......@@ -626,16 +629,16 @@ public class RecordTest {
updatePos = rnd.nextInt(values.length + 1);
for (int i = 0; i < values.length; i++) {
if (i == updatePos) {
rec.write(writer);
rec.read(reader);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec.read(new InputViewDataInputStreamWrapper(reader));
}
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
if (updatePos == values.length) {
rec.write(writer);
rec.read(reader);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec.read(new InputViewDataInputStreamWrapper(reader));
}
testAllRetrievalMethods(rec, permutation2, values);
......@@ -644,17 +647,17 @@ public class RecordTest {
updatePos = rnd.nextInt(values.length + 1);
for (int i = 0; i < values.length; i++) {
if (i == updatePos) {
rec.write(writer);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec = new Record();
rec.read(reader);
rec.read(new InputViewDataInputStreamWrapper(reader));
}
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
rec.write(writer);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec = new Record();
rec.read(reader);
rec.read(new InputViewDataInputStreamWrapper(reader));
testAllRetrievalMethods(rec, permutation2, values);
// test adding and retrieving with partial stream serialization and deserialization into the same record
......@@ -662,15 +665,15 @@ public class RecordTest {
updatePos = rnd.nextInt(values.length + 1);
for (int i = 0; i < values.length; i++) {
if (i == updatePos) {
rec.write(writer);
rec.read(reader);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec.read(new InputViewDataInputStreamWrapper(reader));
}
final int pos = permutation1[i];
rec.setField(pos, values[pos]);
}
rec.write(writer);
rec.read(reader);
rec.write(new OutputViewDataOutputStreamWrapper(writer));
rec.read(new InputViewDataInputStreamWrapper(reader));
testAllRetrievalMethods(rec, permutation2, values);
}
......
......@@ -27,8 +27,8 @@ import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.io.NonParallelInput;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.core.memory.InputViewDataInputWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
import eu.stratosphere.core.memory.InputViewObjectInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewObjectOutputStreamWrapper;
/**
* An input format that returns objects from a collection.
......@@ -78,11 +78,8 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
out.defaultWriteObject();
out.writeInt(dataSet.size());
OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper();
outWrapper.setDelegate(out);
for (T element : dataSet){
serializer.serialize(element, outWrapper);
serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out));
}
}
......@@ -92,12 +89,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
InputViewDataInputWrapper inWrapper = new InputViewDataInputWrapper();
inWrapper.setDelegate(in);
for (int i = 0; i < collectionLength; i++){
T element = serializer.createInstance();
element = serializer.deserialize(element, inWrapper);
element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in));
list.add(element);
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.api.java.record.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* The ExternalProcessInputSplit contains all informations for {@link InputFormat} that read their data from external processes.
......@@ -58,13 +58,13 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
super.read(in);
this.extProcessCommand = StringRecord.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
super.write(out);
StringRecord.writeString(out, this.extProcessCommand);
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.StringUtils;
import io.netty.buffer.ByteBuf;
......@@ -152,13 +152,13 @@ public class AbstractID implements IOReadableWritable {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.lowerPart = in.readLong();
this.upperPart = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeLong(this.lowerPart);
out.writeLong(this.upperPart);
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.util.EnumUtils;
/**
......@@ -78,7 +78,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the return code
this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
......@@ -89,7 +89,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the return code
EnumUtils.writeEnum(out, this.returnCode);
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.util.SerializableArrayList;
......@@ -62,7 +62,7 @@ public class JobProgressResult extends AbstractJobResult {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
this.events.read(in);
......@@ -70,7 +70,7 @@ public class JobProgressResult extends AbstractJobResult {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
this.events.write(out);
......
......@@ -13,8 +13,9 @@
package eu.stratosphere.nephele.client;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
/**
......@@ -49,13 +50,13 @@ public class JobSubmissionResult extends AbstractJobResult {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
}
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.deployment;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.runtime.io.channels.ChannelID;
/**
......@@ -72,7 +72,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
this.outputChannelID.write(out);
this.inputChannelID.write(out);
......@@ -80,7 +80,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.outputChannelID.read(in);
this.inputChannelID.read(in);
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.nephele.deployment;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -93,7 +93,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
this.gateID.write(out);
EnumUtils.writeEnum(out, channelType);
......@@ -106,7 +106,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.gateID.read(in);
this.channelType = EnumUtils.readEnum(in, ChannelType.class);
......
......@@ -13,13 +13,13 @@
package eu.stratosphere.nephele.deployment;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -189,7 +189,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
this.jobID.write(out);
this.vertexID.write(out);
......@@ -222,7 +222,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.jobID.read(in);
this.vertexID.read(in);
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* An abstract event is transmitted from the job manager to the
......@@ -74,7 +74,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the timestamp
this.timestamp = in.readLong();
......@@ -83,7 +83,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the timestamp
out.writeLong(this.timestamp);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -86,7 +86,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
......@@ -96,7 +96,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -68,7 +68,7 @@ public class JobEvent extends AbstractEvent {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
// Read job status
......@@ -80,7 +80,7 @@ public class JobEvent extends AbstractEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
// Write job status
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -139,7 +139,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
// Read the job ID
......@@ -161,7 +161,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
// Write the job ID
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
/**
......@@ -83,7 +83,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
......@@ -93,7 +93,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.event.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.jobgraph.JobVertexID;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -108,7 +108,7 @@ public class VertexEvent extends AbstractEvent {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
......@@ -122,7 +122,7 @@ public class VertexEvent extends AbstractEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
......
......@@ -30,8 +30,9 @@
package eu.stratosphere.nephele.event.task;
import java.io.DataInput;
import java.io.DataOutput;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import java.io.IOException;
/**
......@@ -74,13 +75,13 @@ public class IntegerTaskEvent extends AbstractTaskEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.value);
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.value = in.readInt();
}
......
......@@ -29,11 +29,11 @@
*/
package eu.stratosphere.nephele.event.task;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* This class provides a simple implementation of an event that holds a string value.
......@@ -73,14 +73,14 @@ public class StringTaskEvent extends AbstractTaskEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
StringRecord.writeString(out, this.message);
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.message = StringRecord.readString(in);
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele.execution.librarycache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* A library cache profile request includes a set of library names and issues a task manager to report which of these
......@@ -54,7 +54,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read required jar files
this.requiredLibraries = new String[in.readInt()];
......@@ -66,7 +66,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
if (this.requiredLibraries == null) {
throw new IOException("requiredLibraries is null");
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele.execution.librarycache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* A library cache profile response is the response to a library cache profile request. It contains the set of
......@@ -92,7 +92,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the names of the required jar files
this.requiredLibraries = new String[in.readInt()];
......@@ -110,7 +110,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
if (this.requiredLibraries == null) {
throw new IOException("requiredLibraries is null");
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.execution.librarycache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* This class is used to encapsulate the transmission of a library file in a Nephele RPC call.
......@@ -48,14 +48,14 @@ public class LibraryCacheUpdate implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
LibraryCacheManager.readLibraryFromStream(in);
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
if (this.libraryFileName == null) {
throw new IOException("libraryFileName is null");
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.instance;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* A hardware description reflects the hardware environment which is actually present on the task manager's compute
......@@ -68,7 +68,7 @@ public final class HardwareDescription implements IOReadableWritable {
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.numberOfCPUCores);
out.writeLong(this.sizeOfPhysicalMemory);
......@@ -76,7 +76,7 @@ public final class HardwareDescription implements IOReadableWritable {
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.numberOfCPUCores = in.readInt();
this.sizeOfPhysicalMemory = in.readLong();
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.nephele.instance;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.StringUtils;
/**
......@@ -206,7 +206,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
final int addr_length = in.readInt();
byte[] address = new byte[addr_length];
......@@ -226,7 +226,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.inetAddress.getAddress().length);
out.write(this.inetAddress.getAddress());
......
......@@ -21,6 +21,7 @@ package eu.stratosphere.nephele.ipc;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.util.IOUtils;
import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
......@@ -514,7 +515,7 @@ public class Client {
} catch (IllegalAccessException e) {
LOG.error(e);
}
value.read(in); // read value
value.read(new InputViewDataInputStreamWrapper(in)); // read value
}
call.setValue(value);
} else if (state == Status.ERROR.state) {
......
......@@ -19,12 +19,12 @@
package eu.stratosphere.nephele.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
/**
* The IPC connection header sent by the client to the server
......@@ -49,14 +49,14 @@ class ConnectionHeader implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
this.protocol = StringRecord.readString(in);
}
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
StringRecord.writeString(out, this.protocol);
}
......
......@@ -19,8 +19,6 @@
package eu.stratosphere.nephele.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
......@@ -34,6 +32,8 @@ import java.util.Map;
import javax.net.SocketFactory;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -102,7 +102,7 @@ public class RPC {
// TODO: See if type safety can be improved here
@SuppressWarnings("unchecked")
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.methodName = StringRecord.readString(in);
this.parameters = new IOReadableWritable[in.readInt()];
......@@ -140,7 +140,7 @@ public class RPC {
}
}
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
StringRecord.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
......
......@@ -52,6 +52,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -875,7 +877,7 @@ public abstract class Server {
// / Reads the connection header following version
private void processProtocol() throws IOException {
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data.array()));
header.read(in);
header.read(new InputViewDataInputStreamWrapper(in));
try {
String protocolClassName = header.getProtocol();
if (protocolClassName != null) {
......@@ -894,7 +896,7 @@ public abstract class Server {
IOReadableWritable invocation = newInstance(invocationClass); // read param
invocation.read(dis);
invocation.read(new InputViewDataInputStreamWrapper(dis));
Call call = new Call(id, invocation, this);
callQueue.put(call); // queue the call; maybe blocked here
......@@ -1048,7 +1050,7 @@ public abstract class Server {
} else {
out.writeBoolean(true);
StringRecord.writeString(out, rv.getClass().getName());
rv.write(out);
rv.write(new OutputViewDataOutputStreamWrapper(out));
}
} else {
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.jobgraph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import org.apache.commons.lang.Validate;
import eu.stratosphere.configuration.Configuration;
......@@ -374,7 +374,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
if (jobGraph == null) {
throw new IOException("jobGraph is null, cannot deserialize");
......@@ -454,7 +454,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Number of subtasks
out.writeInt(this.numberOfSubtasks);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.nephele.jobgraph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
......@@ -34,6 +32,8 @@ import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.util.ClassUtils;
......@@ -465,7 +465,7 @@ public class JobGraph implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read job id
this.jobID.read(in);
......@@ -553,7 +553,7 @@ public class JobGraph implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write job ID
this.jobID.write(out);
......@@ -598,7 +598,8 @@ public class JobGraph implements IOReadableWritable {
* @throws IOException
* thrown if an error occurs while writing to the stream
*/
private void writeRequiredJarFiles(final DataOutput out, final AbstractJobVertex[] jobVertices) throws IOException {
private void writeRequiredJarFiles(final DataOutputView out, final AbstractJobVertex[] jobVertices) throws
IOException {
// Now check if all the collected jar files really exist
final FileSystem fs = FileSystem.getLocalFileSystem();
......@@ -643,7 +644,7 @@ public class JobGraph implements IOReadableWritable {
* @throws IOException
* thrown if an error occurs while reading the stream
*/
private void readRequiredJarFiles(final DataInput in) throws IOException {
private void readRequiredJarFiles(final DataInputView in) throws IOException {
// Do jar files follow;
final int numJars = in.readInt();
......
......@@ -13,13 +13,13 @@
package eu.stratosphere.nephele.jobmanager.splitassigner;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.util.StringUtils;
......@@ -68,7 +68,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the job ID
this.jobID.write(out);
......@@ -90,7 +90,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the job ID
this.jobID.read(in);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.nephele.managementgraph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
......@@ -28,6 +26,8 @@ import java.util.Map;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -406,7 +406,7 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read job ID
this.jobID.read(in);
......@@ -485,7 +485,7 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write job ID
this.jobID.write(out);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.nephele.managementgraph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -23,6 +21,8 @@ import java.util.List;
import java.util.Map;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -342,7 +342,7 @@ public final class ManagementGroupVertex extends ManagementAttachment implements
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
int numberOfForwardLinks = in.readInt();
for (int i = 0; i < numberOfForwardLinks; i++) {
......@@ -359,7 +359,7 @@ public final class ManagementGroupVertex extends ManagementAttachment implements
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the number of forward links
out.writeInt(this.forwardEdges.size());
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.nephele.managementgraph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.util.EnumUtils;
import eu.stratosphere.util.StringUtils;
......@@ -266,7 +266,7 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
}
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
// Read the execution state
this.executionState = EnumUtils.readEnum(in, ExecutionState.class);
......@@ -288,7 +288,7 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
// Write the execution state
EnumUtils.writeEnum(out, this.executionState);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -47,14 +47,14 @@ public abstract class InternalExecutionVertexProfilingData implements InternalPr
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.jobId.read(in);
this.executionVertexID.read(in);
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
this.jobId.write(out);
this.executionVertexID.write(out);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -48,7 +48,7 @@ public class InternalExecutionVertexThreadProfilingData extends InternalExecutio
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
super.read(in);
......@@ -60,7 +60,7 @@ public class InternalExecutionVertexThreadProfilingData extends InternalExecutio
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
super.write(out);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -50,7 +50,7 @@ public class InternalInputGateProfilingData implements InternalProfilingData {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.jobId.read(in);
this.executionVertexID.read(in);
......@@ -60,7 +60,7 @@ public class InternalInputGateProfilingData implements InternalProfilingData {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
this.jobId.write(out);
this.executionVertexID.write(out);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
public class InternalInstanceProfilingData implements InternalProfilingData {
......@@ -145,7 +145,7 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.freeMemory = in.readLong();
this.ioWaitCPU = in.readInt();
......@@ -166,7 +166,7 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeLong(this.freeMemory);
out.writeInt(this.ioWaitCPU);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -50,7 +50,7 @@ public class InternalOutputGateProfilingData implements InternalProfilingData {
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.jobId.read(in);
this.executionVertexID.read(in);
......@@ -60,7 +60,7 @@ public class InternalOutputGateProfilingData implements InternalProfilingData {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
this.jobId.write(out);
this.executionVertexID.write(out);
......
......@@ -13,8 +13,6 @@
package eu.stratosphere.nephele.profiling.impl.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
......@@ -22,6 +20,8 @@ import java.util.Queue;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.util.StringUtils;
public class ProfilingDataContainer implements IOReadableWritable {
......@@ -43,7 +43,7 @@ public class ProfilingDataContainer implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
final int numberOfRecords = in.readInt();
for (int i = 0; i < numberOfRecords; i++) {
......@@ -83,7 +83,7 @@ public class ProfilingDataContainer implements IOReadableWritable {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
// Write the number of records
out.writeInt(this.queuedProfilingData.size());
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
......@@ -96,7 +96,7 @@ public final class InputGateProfilingEvent extends VertexProfilingEvent {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
this.gateIndex = in.readInt();
......@@ -105,7 +105,7 @@ public final class InputGateProfilingEvent extends VertexProfilingEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
out.writeInt(this.gateIndex);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
......@@ -300,7 +300,7 @@ public abstract class InstanceProfilingEvent extends ProfilingEvent {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
this.profilingInterval = in.readInt();
......@@ -324,7 +324,7 @@ public abstract class InstanceProfilingEvent extends ProfilingEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
out.writeInt(this.profilingInterval);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
......@@ -94,7 +94,7 @@ public final class OutputGateProfilingEvent extends VertexProfilingEvent {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
this.gateIndex = in.readInt();
......@@ -103,7 +103,7 @@ public final class OutputGateProfilingEvent extends VertexProfilingEvent {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
out.writeInt(this.gateIndex);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.ManagementEvent;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -86,7 +86,7 @@ public abstract class ProfilingEvent extends AbstractEvent implements Management
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
super.read(in);
this.jobID = new JobID();
......@@ -97,7 +97,7 @@ public abstract class ProfilingEvent extends AbstractEvent implements Management
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
super.write(out);
this.jobID.write(out);
......
......@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
......@@ -98,7 +98,7 @@ public final class SingleInstanceProfilingEvent extends InstanceProfilingEvent {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
super.read(in);
this.instanceName = StringRecord.readString(in);
......@@ -106,7 +106,7 @@ public final class SingleInstanceProfilingEvent extends InstanceProfilingEvent {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
super.write(out);
StringRecord.writeString(out, this.instanceName);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
......@@ -91,7 +91,7 @@ public class ThreadProfilingEvent extends VertexProfilingEvent {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
super.read(in);
this.userTime = in.readInt();
......@@ -102,7 +102,7 @@ public class ThreadProfilingEvent extends VertexProfilingEvent {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
super.write(out);
out.writeInt(this.userTime);
......
......@@ -13,10 +13,10 @@
package eu.stratosphere.nephele.profiling.types;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
......@@ -65,7 +65,7 @@ public abstract class VertexProfilingEvent extends ProfilingEvent {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
super.read(in);
this.vertexID = new ManagementVertexID();
......@@ -76,7 +76,7 @@ public abstract class VertexProfilingEvent extends ProfilingEvent {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
super.write(out);
this.vertexID.write(out);
......
......@@ -13,14 +13,14 @@
package eu.stratosphere.nephele.services.accumulators;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.common.accumulators.Accumulator;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.util.StringUtils;
......@@ -60,7 +60,7 @@ public class AccumulatorEvent implements IOReadableWritable {
}
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
out.writeBoolean(this.useUserClassLoader);
jobID.write(out);
out.writeInt(accumulators.size());
......@@ -74,7 +74,7 @@ public class AccumulatorEvent implements IOReadableWritable {
@SuppressWarnings("unchecked")
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.useUserClassLoader = in.readBoolean();
jobID = new JobID();
jobID.read(in);
......
......@@ -183,44 +183,69 @@ public abstract class AbstractPagedInputView implements DataInputView {
// --------------------------------------------------------------------------------------------
// Data Input Specific methods
// --------------------------------------------------------------------------------------------
@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
public int read(byte[] b) throws IOException{
return read(b,0,b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
public int read(byte[] b, int off, int len) throws IOException{
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
}
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.get(this.positionInSegment, b, off, len);
this.positionInSegment += len;
return len;
}
else {
if (remaining == 0) {
advance();
try {
advance();
}catch(EOFException eof){
return -1;
}
remaining = this.limitInSegment - this.positionInSegment;
}
int bytesRead = 0;
while (true) {
int toRead = Math.min(remaining, len);
int toRead = Math.min(remaining, len-bytesRead);
this.currentSegment.get(this.positionInSegment, b, off, toRead);
off += toRead;
len -= toRead;
if (len > 0) {
advance();
remaining = this.limitInSegment - this.positionInSegment;
bytesRead += toRead;
if (len > bytesRead) {
try {
advance();
}catch(EOFException eof){
return bytesRead;
}
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toRead;
break;
}
}
return len;
}
}
@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
int bytesRead = read(b,off,len);
if(bytesRead == -1){
throw new EOFException("There is no more data left in the DataInputView.");
}
}
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele.taskmanager;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -100,7 +100,7 @@ public abstract class AbstractTaskResult implements IOReadableWritable {
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
// Read the jobID
boolean isNotNull = in.readBoolean();
......@@ -118,7 +118,7 @@ public abstract class AbstractTaskResult implements IOReadableWritable {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
// Write jobID
if (this.vertexID == null) {
......
......@@ -13,12 +13,12 @@
package eu.stratosphere.nephele.taskmanager;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -103,7 +103,7 @@ public class TaskExecutionState implements IOReadableWritable {
@Override
public void read(final DataInput in) throws IOException {
public void read(final DataInputView in) throws IOException {
boolean isNotNull = in.readBoolean();
......@@ -133,7 +133,7 @@ public class TaskExecutionState implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
public void write(final DataOutputView out) throws IOException {
if (this.jobID == null) {
out.writeBoolean(false);
......
......@@ -14,10 +14,10 @@
package eu.stratosphere.nephele.taskmanager.transferenvelope;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.util.EnumUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class RegisterTaskManagerResult implements IOReadableWritable {
......@@ -39,12 +39,12 @@ public class RegisterTaskManagerResult implements IOReadableWritable {
@Override
public void write(DataOutput out) throws IOException {
public void write(DataOutputView out) throws IOException {
EnumUtils.writeEnum(out, this.returnCode);
}
@Override
public void read(DataInput in) throws IOException {
public void read(DataInputView in) throws IOException {
this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册