提交 9b494500 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Started moving to 0.6

上级 b0219bba
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.5</stratosphere.version>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -127,7 +127,7 @@ public class DataStream<T extends Tuple> {
}
return returnStream;
}
/**
* Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a
......@@ -137,21 +137,13 @@ public class DataStream<T extends Tuple> {
* The DataStream to connect output with.
* @return The connected DataStream.
*/
public DataStream<T> connectWith(DataStream<T>... streams) {
public DataStream<T> connectWith(DataStream<T> stream) {
DataStream<T> returnStream = copy();
for(DataStream<T> stream:streams){
addConnection(returnStream, stream);
}
return returnStream;
}
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream){
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams);
returnStream.batchSizes.addAll(stream.batchSizes);
return returnStream;
}
......
......@@ -28,8 +28,6 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -37,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -127,7 +127,7 @@ public class JobGraphBuilder {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
source.setInvokableClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -158,7 +158,7 @@ public class JobGraphBuilder {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setInvokableClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -187,7 +187,7 @@ public class JobGraphBuilder {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
sink.setInvokableClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -220,7 +220,8 @@ public class JobGraphBuilder {
int parallelism, int subtasksPerInstance) {
component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
// TODO remove all NumberOfSubtasks setting
// component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
......
......@@ -17,10 +17,9 @@ package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -16,8 +16,8 @@
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -37,19 +37,18 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......@@ -78,13 +77,13 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public Collector<Tuple> collector;
private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>();
private List<Integer> batchSizesPartitioned = new ArrayList<Integer>();
private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>();
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private List<Integer> numOfOutputs_f = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_f = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() {
numComponents++;
......@@ -118,35 +117,47 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned,
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, id,
outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
collector = new StreamCollectorManager<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
return collector;
}
// TODO add type parameters to avoid redundant code
@SuppressWarnings({ "rawtypes", "unchecked" })
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
Object function = null;
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
function = in.readObject();
Object function = in.readObject();
if (operatorName.equals("flatMap")) {
setSerializerDeserializer(function, FlatMapFunction.class);
setSerializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
setSerializerDeserializer(function, MapFunction.class);
setSerializer(function, MapFunction.class);
} else if (operatorName.equals("batchReduce")) {
setSerializerDeserializer(function, GroupReduceFunction.class);
setSerializer(function, GroupReduceFunction.class);
} else if (operatorName.equals("filter")) {
setSerializerDeserializer(function, FilterFunction.class);
setSerializer(function, FilterFunction.class);
} else if (operatorName.equals("sink")) {
setDeserializer(function, SinkFunction.class);
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class,
function.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
} else if (operatorName.equals("source")) {
setSerializer(function, UserSourceInvokable.class, 0);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, function.getClass(), 0, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
} else if (operatorName.equals("elements")) {
outTupleTypeInfo = new TupleTypeInfo<Tuple>(TypeExtractor.getForObject(function));
......@@ -157,43 +168,25 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
} catch (Exception e) {
throw new StreamComponentException("Nonsupported object (named " + operatorName
+ ") passed as operator");
}
}
throw new StreamComponentException("Nonsupported object passed as operator");
private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
setDeserializer(function, clazz);
setSerializer(function, clazz, 1);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
private void setSerializer(Object function, Class<? extends AbstractFunction> clazz) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void setSerializer(Object function, Class<?> clazz, int typeParameter) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
typeParameter, null, null);
1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -240,40 +233,33 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i));
if (outputs_f.size() < batchsizes_f.size()) {
outputs_f.add(outputs.get(i));
} else {
outputsNotPartitioned.add(outputs.get(i));
outputs_s.add(outputs.get(i));
}
}
}
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param config
* Configuration object
* @return The StreamComponent object
*/
private StreamComponent getInvokable(Class<? extends StreamComponent> userFunctionClass,
Configuration config) {
StreamComponent userFunction = null;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
public UserSinkInvokable getSinkInvokable(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamComponent) ois.readObject();
userFunction = (UserSinkInvokable) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
......@@ -283,30 +269,58 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
@SuppressWarnings("rawtypes")
public UserSinkInvokable getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings("rawtypes")
public UserTaskInvokable getTaskInvokable(Configuration config) {
@SuppressWarnings("unchecked")
public UserTaskInvokable getTaskInvokable(Configuration taskConfiguration) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable) getInvokable(userFunctionClass, config);
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
UserTaskInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserTaskInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
@SuppressWarnings("rawtypes")
public UserSourceInvokable getSourceInvokable(Configuration config) {
public UserSourceInvokable getSourceInvokable(Configuration taskConfiguration) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable) getInvokable(userFunctionClass, config);
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
UserSourceInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSourceInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
// TODO find a better solution for this
......@@ -326,40 +340,47 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
private void setPartitioner(Configuration config, int numberOfOutputs,
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Integer batchSize = config.getInteger("batchSize_" + numberOfOutputs, 1);
Integer batchSize = taskConfiguration.getInteger("batchSize_" + nrOutput, 1);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchSizesPartitioned.add(batchSize);
numOfOutputsPartitioned.add(config
.getInteger("numOfOutputs_" + numberOfOutputs, -1));
batchsizes_f.add(batchSize);
numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1));
// TODO:force one partitioning field
keyPosition = config.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
batchSizesNotPartitioned.add(batchSize);
batchsizes_s.add(batchSize);
partitioners.add(partitioner.newInstance());
}
if (log.isTraceEnabled()) {
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with "
+ numberOfOutputs + " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName()
+ " with " + numberOfOutputs + " outputs", e);
+ " with " + nrOutput + " outputs", e);
}
}
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
......
......@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
......@@ -10,7 +9,6 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
......@@ -19,126 +17,100 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* A record writer connects an input gate to an application. It allows the
* application query for incoming records and read them from input gate.
* A record writer connects an input gate to an application. It allows the application
* query for incoming records and read them from input gate.
*
* @param <StreamRecord> The type of the record that can be read from this record reader.
*/
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements
Reader<StreamRecord> {
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements Reader<StreamRecord> {
private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<Tuple> deserializationDelegate;
private TupleSerializer<Tuple> tupleSerializer;
/**
* Stores the last read record.
*/
private StreamRecord lookahead;
/**
* Stores if more no more records will be received from the assigned input
* gate.
* Stores if more no more records will be received from the assigned input gate.
*/
private boolean noMoreRecordsWillFollow;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param taskBase
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
*/
public StreamRecordReader(AbstractTask taskBase,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(taskBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param outputBase
* The application that instantiated the record reader.
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
* The class of records that can be read from the record reader.
*/
public StreamRecordReader(AbstractOutputTask outputBase,
Class<? extends StreamRecord> recordType,
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(outputBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
super(taskBase);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
// --------------------------------------------------------------------------------------------
/**
* Checks if at least one more record can be read from the associated input
* gate. This method may block until the associated input gate is able to
* read the record from one of its input channels.
* Checks if at least one more record can be read from the associated input gate. This method may block
* until the associated input gate is able to read the record from one of its input channels.
*
* @return <code>true</code>it at least one more record can be read from the
* associated input gate, otherwise <code>false</code>
* @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
* <code>false</code>
*/
@Override
public boolean hasNext() throws IOException, InterruptedException {
public boolean hasNext() throws IOException, InterruptedException{
if (this.lookahead != null) {
return true;
} else {
if (this.noMoreRecordsWillFollow) {
return false;
}
StreamRecord record = instantiateRecordType();
record.setDeseralizationDelegate(deserializationDelegate, tupleSerializer);
while (true) {
InputChannelResult result = this.inputGate.readRecord(record);
switch (result) {
case INTERMEDIATE_RECORD_FROM_BUFFER:
case LAST_RECORD_FROM_BUFFER:
this.lookahead = record;
return true;
case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck())
case INTERMEDIATE_RECORD_FROM_BUFFER:
case LAST_RECORD_FROM_BUFFER:
this.lookahead = record;
return true;
case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck()) {
return false;
}
else {
break; // fall through and wait for next record/event
}
case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent());
break;
case END_OF_STREAM:
this.noMoreRecordsWillFollow = true;
return false;
else
break; // fall through and wait for next record/event
case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent());
break;
case END_OF_STREAM:
this.noMoreRecordsWillFollow = true;
return false;
default:
; // fall through the loop
default:
; // fall through the loop
}
}
}
......@@ -149,8 +121,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
*
* @return the current record from the associated input gate.
* @throws IOException
* thrown if any error occurs while reading the record from the
* input gate
* thrown if any error occurs while reading the record from the input gate
*/
@Override
public StreamRecord next() throws IOException, InterruptedException {
......@@ -162,21 +133,19 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
return null;
}
}
@Override
public boolean isInputClosed() {
return this.noMoreRecordsWillFollow;
}
private StreamRecord instantiateRecordType() {
try {
return this.recordType.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName()
+ "'.", e);
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName()
+ "'.", e);
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
}
}
}
}
\ No newline at end of file
......@@ -19,12 +19,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
public class StreamSink extends DataSinkTask {
private static final Log log = LogFactory.getLog(StreamSink.class);
......
......@@ -22,16 +22,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
public class StreamSource extends DataSourceTask<DummyIS> {
private static final Log log = LogFactory.getLog(StreamSource.class);
......@@ -55,16 +55,6 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
sourceInstanceID = numSources;
}
@Override
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<DummyIS> getInputSplitType() {
return null;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
......
......@@ -22,16 +22,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
public class StreamTask extends RegularPactTask {
private static final Log log = LogFactory.getLog(StreamTask.class);
......
......@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册