提交 ebae500f 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] StreamComponentHelper refactor

上级 f08d55d0
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.5.1</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -17,9 +17,10 @@ package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -49,6 +49,7 @@ import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......@@ -77,13 +78,13 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public Collector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private List<Integer> numOfOutputs_f = new ArrayList<Integer>();
private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>();
private List<Integer> batchSizesPartitioned = new ArrayList<Integer>();
private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_f = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() {
numComponents++;
......@@ -117,47 +118,35 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollectorManager<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned,
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, id,
outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
return collector;
}
// TODO add type parameters to avoid redundant code
@SuppressWarnings({ "rawtypes", "unchecked" })
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
Object function = null;
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
Object function = in.readObject();
function = in.readObject();
if (operatorName.equals("flatMap")) {
setSerializer(function, FlatMapFunction.class);
setSerializerDeserializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
setSerializer(function, MapFunction.class);
setSerializerDeserializer(function, MapFunction.class);
} else if (operatorName.equals("batchReduce")) {
setSerializer(function, GroupReduceFunction.class);
setSerializerDeserializer(function, GroupReduceFunction.class);
} else if (operatorName.equals("filter")) {
setSerializer(function, FilterFunction.class);
setSerializerDeserializer(function, FilterFunction.class);
} else if (operatorName.equals("sink")) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class,
function.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
setDeserializer(function, SinkFunction.class);
} else if (operatorName.equals("source")) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, function.getClass(), 0, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
setSerializer(function, UserSourceInvokable.class, 0);
} else if (operatorName.equals("elements")) {
outTupleTypeInfo = new TupleTypeInfo<Tuple>(TypeExtractor.getForObject(function));
......@@ -168,25 +157,43 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
} catch (Exception e) {
throw new StreamComponentException("Nonsupported object passed as operator");
throw new StreamComponentException("Nonsupported object (named " + operatorName
+ ") passed as operator");
}
}
private void setSerializer(Object function, Class<? extends AbstractFunction> clazz) {
private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
setDeserializer(function, clazz);
setSerializer(function, clazz, 1);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void setSerializer(Object function, Class<?> clazz, int typeParameter) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
1, null, null);
typeParameter, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -240,26 +247,33 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputs_f.size() < batchsizes_f.size()) {
outputs_f.add(outputs.get(i));
if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i));
} else {
outputs_s.add(outputs.get(i));
outputsNotPartitioned.add(outputs.get(i));
}
}
}
public UserSinkInvokable getSinkInvokable(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param config
* Configuration object
* @return The StreamComponent object
*/
private StreamComponent getInvokable(Class<? extends StreamComponent> userFunctionClass,
Configuration config) {
StreamComponent userFunction = null;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
userFunction = (StreamComponent) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
......@@ -269,58 +283,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
@SuppressWarnings("rawtypes")
public UserSinkInvokable getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings("unchecked")
public UserTaskInvokable getTaskInvokable(Configuration taskConfiguration) {
@SuppressWarnings("rawtypes")
public UserTaskInvokable getTaskInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
UserTaskInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserTaskInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable) getInvokable(userFunctionClass, config);
}
public UserSourceInvokable getSourceInvokable(Configuration taskConfiguration) {
@SuppressWarnings("rawtypes")
public UserSourceInvokable getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
UserSourceInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSourceInvokable) ois.readObject();
// userFunction.declareOutputs(outputs, instanceID, name,
// recordBuffer,
// faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable) getInvokable(userFunctionClass, config);
}
// TODO find a better solution for this
......@@ -340,47 +326,40 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
private void setPartitioner(Configuration config, int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class);
Integer batchSize = taskConfiguration.getInteger("batchSize_" + nrOutput, 1);
Integer batchSize = config.getInteger("batchSize_" + numberOfOutputs, 1);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchsizes_f.add(batchSize);
numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1));
batchSizesPartitioned.add(batchSize);
numOfOutputsPartitioned.add(config
.getInteger("numOfOutputs_" + numberOfOutputs, -1));
// TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
keyPosition = config.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
batchsizes_s.add(batchSize);
batchSizesNotPartitioned.add(batchSize);
partitioners.add(partitioner.newInstance());
}
if (log.isTraceEnabled()) {
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with "
+ numberOfOutputs + " outputs");
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName()
+ " with " + nrOutput + " outputs", e);
+ " with " + numberOfOutputs + " outputs", e);
}
}
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册