提交 6c2a1b3e 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] refactor and jbuilder update for StreamRecord2

上级 697eb774
......@@ -20,6 +20,7 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
......@@ -30,7 +31,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U
}
@Override
public void invoke(StreamRecord record, StreamCollector<OUT> collector) throws Exception {
public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception {
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector);
}
......
......@@ -18,6 +18,7 @@ import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
FilterFunction<IN> filterFunction;
......@@ -27,7 +28,7 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
}
@Override
public void invoke(StreamRecord record, StreamCollector<IN> collector) throws Exception {
public void invoke(StreamRecord record, Collector<IN> collector) throws Exception {
for (int i = 0; i < record.getBatchSize(); i++) {
IN tuple = (IN) record.getTuple(i);
if (filterFunction.filter(tuple)) {
......
......@@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FlatMapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvokable<T, R> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class FlatMapInvokable<T extends Tuple, R extends Tuple> extends UserTask
}
@Override
public void invoke(StreamRecord record, StreamCollector<R> collector) throws Exception {
public void invoke(StreamRecord record, Collector<R> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -460,6 +460,10 @@ public class JobGraphBuilder {
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger(
"numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
......
......@@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class MapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvokable<T, R> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class MapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvo
}
@Override
public void invoke(StreamRecord record, StreamCollector<R> collector) throws Exception {
public void invoke(StreamRecord record, Collector<R> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
}
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -16,15 +16,15 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class DefaultSinkInvokable extends UserSinkInvokable<Tuple> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
String value = (String) record.getTuple(0).getField(0);
System.out.println(value);
}
......
......@@ -16,15 +16,15 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class DefaultTaskInvokable extends UserTaskInvokable<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
// TODO Auto-generated method stub
}
......
......@@ -16,11 +16,11 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponent {
public abstract void invoke(StreamRecord record, StreamCollector<OUT> collector)
public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception;
}
......@@ -46,6 +46,7 @@ import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.StreamCollector2;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -62,6 +63,7 @@ import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector;
public final class StreamComponentHelper<T extends AbstractInvokable> {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
......@@ -75,9 +77,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private TupleSerializer<Tuple> outTupleSerializer = null;
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public StreamCollector<Tuple> collector;
public Collector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private List<Integer> numOfOutputs_f = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
......@@ -112,7 +115,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public StreamCollector<Tuple> setCollector(Configuration taskConfiguration, int id,
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
......@@ -120,6 +123,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs);
// collector = new StreamCollector2<Tuple>(batchsizes_s.toArray(),
// batchsizes_f.toArray(),numOfOutputs_f , keyPosition, batchTimeout,
// id, outSerializationDelegate, outputs);
return collector;
}
......@@ -340,6 +347,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchsizes_f.add(batchSize);
numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1));
// TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册