提交 7d9858a0 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] refactor window operator and internal state management

上级 9b494500
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -127,7 +127,7 @@ public class DataStream<T extends Tuple> {
}
return returnStream;
}
/**
* Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a
......@@ -137,13 +137,21 @@ public class DataStream<T extends Tuple> {
* The DataStream to connect output with.
* @return The connected DataStream.
*/
public DataStream<T> connectWith(DataStream<T> stream) {
public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy();
for(DataStream<T> stream:streams){
addConnection(returnStream, stream);
}
return returnStream;
}
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream){
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams);
returnStream.batchSizes.addAll(stream.batchSizes);
return returnStream;
}
......
......@@ -28,6 +28,8 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -35,8 +37,6 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -127,7 +127,7 @@ public class JobGraphBuilder {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInvokableClass(StreamSource.class);
source.setInputClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -158,7 +158,7 @@ public class JobGraphBuilder {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setInvokableClass(StreamTask.class);
task.setTaskClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -187,7 +187,7 @@ public class JobGraphBuilder {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setInvokableClass(StreamSink.class);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
......@@ -220,8 +220,7 @@ public class JobGraphBuilder {
int parallelism, int subtasksPerInstance) {
component.setNumberOfSubtasks(parallelism);
// TODO remove all NumberOfSubtasks setting
// component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
......
......@@ -17,9 +17,10 @@ package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -16,8 +16,8 @@
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -37,18 +37,19 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......@@ -77,13 +78,13 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public Collector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private List<Integer> numOfOutputs_f = new ArrayList<Integer>();
private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>();
private List<Integer> batchSizesPartitioned = new ArrayList<Integer>();
private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_f = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() {
numComponents++;
......@@ -117,47 +118,35 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollectorManager<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned,
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, id,
outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
return collector;
}
// TODO add type parameters to avoid redundant code
@SuppressWarnings({ "rawtypes", "unchecked" })
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
Object function = null;
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
Object function = in.readObject();
function = in.readObject();
if (operatorName.equals("flatMap")) {
setSerializer(function, FlatMapFunction.class);
setSerializerDeserializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
setSerializer(function, MapFunction.class);
setSerializerDeserializer(function, MapFunction.class);
} else if (operatorName.equals("batchReduce")) {
setSerializer(function, GroupReduceFunction.class);
setSerializerDeserializer(function, GroupReduceFunction.class);
} else if (operatorName.equals("filter")) {
setSerializer(function, FilterFunction.class);
setSerializerDeserializer(function, FilterFunction.class);
} else if (operatorName.equals("sink")) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class,
function.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
setDeserializer(function, SinkFunction.class);
} else if (operatorName.equals("source")) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, function.getClass(), 0, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
setSerializer(function, UserSourceInvokable.class, 0);
} else if (operatorName.equals("elements")) {
outTupleTypeInfo = new TupleTypeInfo<Tuple>(TypeExtractor.getForObject(function));
......@@ -168,25 +157,43 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
} catch (Exception e) {
throw new StreamComponentException("Nonsupported object passed as operator");
throw new StreamComponentException("Nonsupported object (named " + operatorName
+ ") passed as operator");
}
}
private void setSerializer(Object function, Class<? extends AbstractFunction> clazz) {
private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
setDeserializer(function, clazz);
setSerializer(function, clazz, 1);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void setSerializer(Object function, Class<?> clazz, int typeParameter) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
1, null, null);
typeParameter, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -233,33 +240,40 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
outputPartitioner));
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
outputPartitioner));
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputs_f.size() < batchsizes_f.size()) {
outputs_f.add(outputs.get(i));
if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i));
} else {
outputs_s.add(outputs.get(i));
outputsNotPartitioned.add(outputs.get(i));
}
}
}
public UserSinkInvokable getSinkInvokable(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param config
* Configuration object
* @return The StreamComponent object
*/
private StreamComponent getInvokable(Class<? extends StreamComponent> userFunctionClass,
Configuration config) {
StreamComponent userFunction = null;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
userFunction = (StreamComponent) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
......@@ -269,58 +283,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
@SuppressWarnings("rawtypes")
public UserSinkInvokable getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings("unchecked")
public UserTaskInvokable getTaskInvokable(Configuration taskConfiguration) {
@SuppressWarnings("rawtypes")
public UserTaskInvokable getTaskInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
UserTaskInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserTaskInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable) getInvokable(userFunctionClass, config);
}
public UserSourceInvokable getSourceInvokable(Configuration taskConfiguration) {
@SuppressWarnings("rawtypes")
public UserSourceInvokable getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
UserSourceInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSourceInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable) getInvokable(userFunctionClass, config);
}
// TODO find a better solution for this
......@@ -340,47 +326,40 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
private void setPartitioner(Configuration config, int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class);
Integer batchSize = taskConfiguration.getInteger("batchSize_" + nrOutput, 1);
Integer batchSize = config.getInteger("batchSize_" + numberOfOutputs, 1);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchsizes_f.add(batchSize);
numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1));
batchSizesPartitioned.add(batchSize);
numOfOutputsPartitioned.add(config
.getInteger("numOfOutputs_" + numberOfOutputs, -1));
// TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
keyPosition = config.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
batchsizes_s.add(batchSize);
batchSizesNotPartitioned.add(batchSize);
partitioners.add(partitioner.newInstance());
}
if (log.isTraceEnabled()) {
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with "
+ numberOfOutputs + " outputs");
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName()
+ " with " + nrOutput + " outputs", e);
+ " with " + numberOfOutputs + " outputs", e);
}
}
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
......
......@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
......@@ -9,6 +10,7 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
......@@ -17,100 +19,126 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* A record writer connects an input gate to an application. It allows the application
* query for incoming records and read them from input gate.
* A record writer connects an input gate to an application. It allows the
* application query for incoming records and read them from input gate.
*
* @param <StreamRecord> The type of the record that can be read from this record reader.
*/
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements Reader<StreamRecord> {
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements
Reader<StreamRecord> {
private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<Tuple> deserializationDelegate;
private TupleSerializer<Tuple> tupleSerializer;
/**
* Stores the last read record.
*/
private StreamRecord lookahead;
/**
* Stores if more no more records will be received from the assigned input gate.
* Stores if more no more records will be received from the assigned input
* gate.
*/
private boolean noMoreRecordsWillFollow;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new record reader and registers a new input gate with the application's environment.
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param taskBase
* The application that instantiated the record reader.
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
*/
public StreamRecordReader(AbstractTask taskBase,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(taskBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param outputBase
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
* The class of records that can be read from the record reader.
*/
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType,
public StreamRecordReader(AbstractOutputTask outputBase,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
super(taskBase);
// super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(outputBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
// --------------------------------------------------------------------------------------------
/**
* Checks if at least one more record can be read from the associated input gate. This method may block
* until the associated input gate is able to read the record from one of its input channels.
* Checks if at least one more record can be read from the associated input
* gate. This method may block until the associated input gate is able to
* read the record from one of its input channels.
*
* @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
* <code>false</code>
* @return <code>true</code>it at least one more record can be read from the
* associated input gate, otherwise <code>false</code>
*/
@Override
public boolean hasNext() throws IOException, InterruptedException{
public boolean hasNext() throws IOException, InterruptedException {
if (this.lookahead != null) {
return true;
} else {
if (this.noMoreRecordsWillFollow) {
return false;
}
StreamRecord record = instantiateRecordType();
record.setDeseralizationDelegate(deserializationDelegate, tupleSerializer);
while (true) {
InputChannelResult result = this.inputGate.readRecord(record);
switch (result) {
case INTERMEDIATE_RECORD_FROM_BUFFER:
case LAST_RECORD_FROM_BUFFER:
this.lookahead = record;
return true;
case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck()) {
return false;
}
else {
break; // fall through and wait for next record/event
}
case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent());
break;
case END_OF_STREAM:
this.noMoreRecordsWillFollow = true;
case INTERMEDIATE_RECORD_FROM_BUFFER:
case LAST_RECORD_FROM_BUFFER:
this.lookahead = record;
return true;
case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck())
return false;
default:
; // fall through the loop
else
break; // fall through and wait for next record/event
case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent());
break;
case END_OF_STREAM:
this.noMoreRecordsWillFollow = true;
return false;
default:
; // fall through the loop
}
}
}
......@@ -121,7 +149,8 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
*
* @return the current record from the associated input gate.
* @throws IOException
* thrown if any error occurs while reading the record from the input gate
* thrown if any error occurs while reading the record from the
* input gate
*/
@Override
public StreamRecord next() throws IOException, InterruptedException {
......@@ -133,19 +162,21 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
return null;
}
}
@Override
public boolean isInputClosed() {
return this.noMoreRecordsWillFollow;
}
private StreamRecord instantiateRecordType() {
try {
return this.recordType.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName()
+ "'.", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName()
+ "'.", e);
}
}
}
\ No newline at end of file
}
......@@ -19,12 +19,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends DataSinkTask {
public class StreamSink extends AbstractOutputTask {
private static final Log log = LogFactory.getLog(StreamSink.class);
......
......@@ -22,16 +22,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends DataSourceTask<DummyIS> {
public class StreamSource extends AbstractInputTask<DummyIS> {
private static final Log log = LogFactory.getLog(StreamSource.class);
......@@ -55,6 +55,16 @@ public class StreamSource extends DataSourceTask<DummyIS> {
sourceInstanceID = numSources;
}
@Override
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<DummyIS> getInputSplitType() {
return null;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
......
......@@ -22,16 +22,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends RegularPactTask {
public class StreamTask extends AbstractTask {
private static final Log log = LogFactory.getLog(StreamTask.class);
......
......@@ -19,67 +19,68 @@ import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
public class StreamWindowTask<InTuple extends Tuple, OutTuple extends Tuple> extends FlatMapFunction<InTuple, OutTuple> {
private int computeGranularity;
private int windowFieldId = 1;
private int windowFieldId;
private ArrayList tempArrayList;
private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private ArrayList<InTuple> tempTupleArray;
private SlidingWindowState<InTuple> window;
private long initTimestamp = -1;
private long nextTimestamp = -1;
//protected StateCheckpointer checkpointer = new StateCheckpointer("object.out", 1000);
public StreamWindowTask(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new SlidingWindowState(windowSize, slidingStep,
window = new SlidingWindowState<InTuple>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
//checkpointer.RegisterState(window);
//Thread t = new Thread(checkpointer);
//t.start();
}
private void incrementCompute(ArrayList tupleArray) {}
protected void incrementCompute(ArrayList<InTuple> tupleArray) {}
private void decrementCompute(ArrayList tupleArray) {}
protected void decrementCompute(ArrayList<InTuple> tupleArray) {}
private void produceOutput(long progress, Collector out) {}
protected void produceOutput(long progress, Collector<OutTuple> out) {}
@Override
public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
public void flatMap(InTuple value, Collector<OutTuple> out)
throws Exception {
long progress = (Long) value.getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
tempTupleArray = new ArrayList<InTuple>();
} else {
if (progress > nextTimestamp) {
if (window.isFull()) {
ArrayList expiredArrayList = window.popFront();
incrementCompute(tempArrayList);
decrementCompute(expiredArrayList);
window.pushBack(tempArrayList);
ArrayList<InTuple> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempArrayList);
window.pushBack(tempArrayList);
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
produceOutput(progress, out);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
tempTupleArray = new ArrayList<InTuple>();
}
tempArrayList.add(value);
}
tempTupleArray.add(value);
}
}
}
......@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
......@@ -23,7 +23,7 @@ import eu.stratosphere.util.Collector;
public class KMeansSource extends SourceFunction<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_SEED = 4650285087650871364L;
private final long DEFAULT_SEED = 4650285087650871364L;
private Random random = new Random(DEFAULT_SEED);
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private int numCenter;
......
......@@ -17,84 +17,45 @@ package eu.stratosphere.streaming.examples.window.sum;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.streaming.api.streamcomponent.StreamWindowTask;
import eu.stratosphere.streaming.state.TableState;
import eu.stratosphere.util.Collector;
public class WindowSumAggregate extends
FlatMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private ArrayList<Tuple2<Integer, Long>> tempTupleArray = null;
StreamWindowTask<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = -2832409561059237150L;
private TableState<String, Integer> sum;
private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
public WindowSumAggregate() {
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
public WindowSumAggregate(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) {
super(windowSize, slidingStep, computeGranularity, windowFieldId);
sum = new TableState<String, Integer>();
sum.put("sum", 0);
//checkpointer.RegisterState(sum);
}
private void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
@Override
protected void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") + number);
}
}
private void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
@Override
protected void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") - number);
}
}
private void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
@Override
protected void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
outTuple.f0 = sum.get("sum");
outTuple.f1 = progress;
out.collect(outTuple);
}
@Override
public void flatMap(Tuple2<Integer, Long> value,
Collector<Tuple2<Integer, Long>> out) throws Exception {
long progress = value.f1;
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<Integer, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
produceOutput(progress, out);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
}
}
tempTupleArray.add(value);
}
}
......@@ -24,15 +24,13 @@ public class WindowSumLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM)
.map(new WindowSumMultiple(), PARALELISM)
.flatMap(new WindowSumAggregate(), PARALELISM)
.flatMap(new WindowSumAggregate(100, 20, 10, 1), PARALELISM)
.addSink(new WindowSumSink());
env.execute();
......
......@@ -17,37 +17,28 @@ package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.streaming.api.streamcomponent.StreamWindowTask;
import eu.stratosphere.streaming.state.TableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.util.Collector;
public class WindowWordCountCounter extends
FlatMapFunction<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
StreamWindowTask<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize = 10;
private int slidingStep = 2;
private int computeGranularity = 1;
private ArrayList<Tuple2<String, Long>> tempTupleArray = null;
private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private Long timestamp = 0L;
private TableState<String, Integer> wordCounts;
public WindowWordCountCounter() {
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
public WindowWordCountCounter(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) {
super(windowSize, slidingStep, computeGranularity, windowFieldId);
wordCounts = new TableState<String, Integer>();
}
private void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
@Override
protected void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
if (wordCounts.containsKey(word)) {
......@@ -59,7 +50,8 @@ public class WindowWordCountCounter extends
}
}
private void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
@Override
protected void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
int count = wordCounts.get(word) - 1;
......@@ -71,47 +63,15 @@ public class WindowWordCountCounter extends
}
}
private void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
MutableTableStateIterator<String, Integer> iterator = wordCounts.getIterator();
@Override
protected void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
TableStateIterator<String, Integer> iterator = wordCounts.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
outTuple.f0 = tuple.f0;
outTuple.f1 = tuple.f1;
outTuple.f2 = timestamp;
outTuple.f2 = progress;
out.collect(outTuple);
}
}
@Override
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple3<String, Integer, Long>> out) throws Exception {
timestamp = value.f1;
if (initTimestamp == -1) {
initTimestamp = timestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
} else {
if (timestamp >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<String, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(timestamp, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
produceOutput(timestamp, out);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
}
}
tempTupleArray.add(value);
}
}
......@@ -34,7 +34,7 @@ public class WindowWordCountLocal {
.addSource(new WindowWordCountSource(), SOURCE_PARALELISM)
.flatMap(new WindowWordCountSplitter(), PARALELISM)
.partitionBy(0)
.flatMap(new WindowWordCountCounter(), PARALELISM)
.flatMap(new WindowWordCountCounter(10, 2, 1, 1), PARALELISM)
.addSink(new WindowWordCountSink());
env.execute();
......
......@@ -20,7 +20,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceCounter;
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Grouping by a key
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Group to the partitioner with the lowest id
......
......@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.partitioner;
import java.util.Random;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Randomly group, to distribute equally
......
......@@ -28,25 +28,16 @@ public class WordCountPerformanceSplitter extends FlatMapFunction<Tuple1<String>
PerformanceCounter pCounter = new
PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/judit/strato/perf/broadcast4.csv");
"/home/mbalassi/strato-perf.csv");
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableTableState<K, V> implements TableState<K, V>, Serializable {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) {
state.put(key, value);
}
@Override
public V get(K key) {
return state.get(key);
}
@Override
public void delete(K key) {
state.remove(key);
}
@Override
public boolean containsKey(K key) {
return state.containsKey(key);
}
@Override
public MutableTableStateIterator<K, V> getIterator() {
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
@Override
public String serialize() {
return null;
}
@Override
public void deserialize(String str) {
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableTableStateIterator<K, V> implements TableStateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableTableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Tuple2<K, V> next() {
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
}
......@@ -20,13 +20,15 @@ import java.util.ArrayList;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.api.java.tuple.Tuple;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class SlidingWindowState implements Serializable{
public class SlidingWindowState<InTuple extends Tuple> implements Serializable{
private static final long serialVersionUID = -2376149970115888901L;
private int currentRecordCount;
private int fullRecordCount;
......@@ -43,13 +45,13 @@ public class SlidingWindowState implements Serializable{
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(ArrayList tupleArray) {
public void pushBack(ArrayList<InTuple> tupleArray) {
buffer.add(tupleArray);
currentRecordCount += 1;
}
public ArrayList popFront() {
ArrayList frontRecord = (ArrayList) buffer.get();
public ArrayList<InTuple> popFront() {
ArrayList<InTuple> frontRecord = (ArrayList<InTuple>) buffer.get();
buffer.remove();
return frontRecord;
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SlidingWindowStateIterator<K>{
public boolean hasNext() {
return false;
}
public Tuple2<K, StreamRecord> next() {
return null;
}
}
......@@ -15,15 +15,34 @@
package eu.stratosphere.streaming.state;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* An internal state interface that supports stateful operator.
* The most general internal state that stores data in a mutable map.
*/
public interface TableState<K, V>{
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public String serialize();
public void deserialize(String str);
public TableStateIterator<K, V> getIterator();
public class TableState<K, V> implements Serializable {
private Map<K, V> state=new LinkedHashMap<K, V>();
public void put(K key, V value) {
state.put(key, value);
}
public V get(K key) {
return state.get(key);
}
public void delete(K key) {
state.remove(key);
}
public boolean containsKey(K key) {
return state.containsKey(key);
}
public TableStateIterator<K, V> getIterator() {
return new TableStateIterator<K, V>(state.entrySet().iterator());
}
}
......@@ -15,12 +15,24 @@
package eu.stratosphere.streaming.state;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
/**
* the iterator for internal states.
*/
public interface TableStateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
public class TableStateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public TableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
public boolean hasNext() {
return iterator.hasNext();
}
public Tuple2<K, V> next() {
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
}
......@@ -15,22 +15,43 @@
package eu.stratosphere.streaming.state.manager;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.LinkedList;
import eu.stratosphere.streaming.state.TableState;
public class StateCheckpointer implements Runnable, Serializable {
public class StateCheckpointer {
private LinkedList<Object> stateList = new LinkedList<Object>();
ObjectOutputStream oos;
long timeInterval;
private LinkedList<TableState> stateList = new LinkedList<TableState>();
public void RegisterState(TableState state){
public StateCheckpointer(String filename, long timeIntervalMS) {
try {
oos = new ObjectOutputStream(new FileOutputStream(filename));
} catch (Exception e) {
e.printStackTrace();
}
this.timeInterval = timeIntervalMS;
}
public void RegisterState(Object state) {
stateList.add(state);
}
public void CheckpointStates(){
for(TableState state: stateList){
//take snapshot of every registered state.
}
@Override
public void run() {
// take snapshot of every registered state.
while (true) {
try {
Thread.sleep(timeInterval);
for (Object state : stateList) {
oos.writeObject(state);
oos.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -26,6 +26,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.util.Collector;
public class MapTest {
......@@ -39,6 +40,36 @@ public class MapTest {
}
}
}
public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 5; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MySource2 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 5; i < 10; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MySource3 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 10; i < 15; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
......@@ -48,6 +79,15 @@ public class MapTest {
return new Tuple1<Integer>(value.f0 * value.f0);
}
}
public static final class MyJoinMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
joinSetResult.add(value.f0);
return new Tuple1<Integer>(value.f0);
}
}
public static final class MyFieldsMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
......@@ -122,6 +162,13 @@ public class MapTest {
graphResult++;
}
}
public static final class JoinSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
}
}
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
......@@ -138,6 +185,9 @@ public class MapTest {
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static List<Integer> fromCollectionFields = new ArrayList<Integer>();
private static Set<Integer> fromCollectionDiffFieldsSet = new HashSet<Integer>();
private static Set<Integer> singleJoinSetExpected = new HashSet<Integer>();
private static Set<Integer> multipleJoinSetExpected = new HashSet<Integer>();
private static Set<Integer> joinSetResult = new HashSet<Integer>();
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
......@@ -169,6 +219,19 @@ public class MapTest {
}
}
}
private static void fillSingleJoinSet() {
for (int i = 0; i < 10; i++) {
singleJoinSetExpected.add(i);
}
}
private static void fillMultipleJoinSet() {
for (int i = 0; i < 15; i++) {
multipleJoinSetExpected.add(i);
}
}
@Test
public void mapTest() throws Exception {
......@@ -287,5 +350,51 @@ public class MapTest {
// }
//
// }
@Test
public void singleConnectWithTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> source1 = env.addSource(new MySource1(),
1);
DataStream<Tuple1<Integer>> source2 = env
.addSource(new MySource2(), 1)
.connectWith(source1)
.partitionBy(0)
.map(new MyJoinMap(), 1)
.addSink(new JoinSink());
env.execute();
fillSingleJoinSet();
assertEquals(singleJoinSetExpected, joinSetResult);
}
@Test
public void multipleConnectWithTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> source1 = env.addSource(new MySource1(),
1);
DataStream<Tuple1<Integer>> source2 = env.addSource(new MySource2(),
1);
DataStream<Tuple1<Integer>> source3 = env
.addSource(new MySource3(), 1)
.connectWith(source1, source2)
.partitionBy(0)
.map(new MyJoinMap(), 1)
.addSink(new JoinSink());
env.execute();
fillMultipleJoinSet();
assertEquals(multipleJoinSetExpected, joinSetResult);
}
}
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
......@@ -24,12 +24,12 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollector2Test {
public class StreamCollectorManagerTest {
StreamCollectorManager<Tuple> collector;
......@@ -55,9 +55,7 @@ public class StreamCollector2Test {
fOut.add(rw2);
collector = new StreamCollectorManager<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
t.f0 = 0;
collector.collect(t);
......
......@@ -27,13 +27,13 @@ public class StreamCollectorTest {
@Test
public void testStreamCollector() {
StreamCollector collector = new StreamCollector(10, 1000, 0, null);
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(10, 1000, 0, null);
assertEquals(10, collector.batchSize);
}
@Test
public void testCollect() {
StreamCollector collector = new StreamCollector(2, 1000, 0, null);
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, 1000, 0, null);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......@@ -43,8 +43,7 @@ public class StreamCollectorTest {
@Test
public void testBatchSize() throws InterruptedException {
System.out.println("---------------");
StreamCollector collector = new StreamCollector(3, 100, 0, null);
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(3, 100, 0, null);
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
......@@ -52,14 +51,13 @@ public class StreamCollectorTest {
Thread.sleep(200);
collector.collect(new Tuple1<Integer>(2));
collector.collect(new Tuple1<Integer>(3));
System.out.println("---------------");
}
@Test
public void recordWriter() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter);
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, 1000, 0, null, recWriter);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......
......@@ -16,16 +16,16 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.util.ArrayList;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class MockRecordWriter extends RecordWriter<StreamRecord> {
public ArrayList<StreamRecord> emittedRecords;
public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord> outputClass) {
super(inputBase);
public MockRecordWriter(AbstractInputTask<?> inputBase, Class<StreamRecord> outputClass) {
super(inputBase, outputClass);
}
public boolean initList() {
......
......@@ -21,7 +21,7 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class FaultToleranceUtilTest {
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
......@@ -26,7 +26,7 @@ public class InternalStateTest {
@Test
public void MutableTableStateTest(){
MutableTableState<String, String> state=new MutableTableState<String, String>();
TableState<String, String> state=new TableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册