提交 916c3a83 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Iterative datafiles cleaned

上级 386a3328
......@@ -20,6 +20,7 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
......@@ -30,7 +31,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U
}
@Override
public void invoke(StreamRecord record, StreamCollector<OUT> collector) throws Exception {
public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception {
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector);
}
......
......@@ -18,6 +18,7 @@ import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
FilterFunction<IN> filterFunction;
......@@ -27,7 +28,7 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
}
@Override
public void invoke(StreamRecord record, StreamCollector<IN> collector) throws Exception {
public void invoke(StreamRecord record, Collector<IN> collector) throws Exception {
for (int i = 0; i < record.getBatchSize(); i++) {
IN tuple = (IN) record.getTuple(i);
if (filterFunction.filter(tuple)) {
......
......@@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FlatMapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvokable<T, R> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class FlatMapInvokable<T extends Tuple, R extends Tuple> extends UserTask
}
@Override
public void invoke(StreamRecord record, StreamCollector<R> collector) throws Exception {
public void invoke(StreamRecord record, Collector<R> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -460,6 +460,10 @@ public class JobGraphBuilder {
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger(
"numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
......
......@@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class MapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvokable<T, R> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class MapInvokable<T extends Tuple, R extends Tuple> extends UserTaskInvo
}
@Override
public void invoke(StreamRecord record, StreamCollector<R> collector) throws Exception {
public void invoke(StreamRecord record, Collector<R> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
private static final long serialVersionUID = 1L;
......@@ -29,7 +30,7 @@ public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
}
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
......
......@@ -53,6 +53,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
// TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
//TODO: move copy to StreamCollector2
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
......
......@@ -28,45 +28,50 @@ public class StreamCollector2<T extends Tuple> implements Collector<T> {
ArrayList<StreamCollector<Tuple>> notPartitionedCollectors;
ArrayList<StreamCollector<Tuple>[]> partitionedCollectors;
List<RecordWriter<StreamRecord>> partitionedOutputs;
List<RecordWriter<StreamRecord>> notPartitionedOutputs;
int keyPostition;
// TODO consider channelID
public StreamCollector2(int[] batchSizesOfNotPartitioned, int[] batchSizesOfPartitioned,
int[] parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
public StreamCollector2(List<Integer> batchSizesOfNotPartitioned, List<Integer> batchSizesOfPartitioned,
List<Integer> parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
List<RecordWriter<StreamRecord>> partitionedOutputs,
List<RecordWriter<StreamRecord>> notPartitionedOutputs) {
notPartitionedCollectors = new ArrayList<StreamCollector<Tuple>>(
batchSizesOfNotPartitioned.length);
batchSizesOfNotPartitioned.size());
partitionedCollectors = new ArrayList<StreamCollector<Tuple>[]>(
batchSizesOfPartitioned.length);
batchSizesOfPartitioned.size());
this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.length; i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs));
for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, notPartitionedOutputs));
}
for (int i = 0; i < batchSizesOfPartitioned.length; i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput[i]];
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) {
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs);
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(partitionedOutputs.get(i));
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, output);
}
partitionedCollectors.add(collectors);
}
}
// TODO copy here instead of copying inside every StreamCollector
@Override
public void collect(T record) {
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(record);
}
int partitionHash = Math.abs(record.getField(keyPostition).hashCode());
for (StreamCollector<Tuple>[] collectors : partitionedCollectors) {
collectors[partitionHash % collectors.length].collect(record);
}
......
......@@ -16,15 +16,15 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class DefaultSinkInvokable extends UserSinkInvokable<Tuple> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
String value = (String) record.getTuple(0).getField(0);
System.out.println(value);
}
......
......@@ -16,15 +16,15 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class DefaultTaskInvokable extends UserTaskInvokable<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
public void invoke(StreamRecord record, Collector<Tuple> collector) throws Exception {
}
......
......@@ -16,11 +16,11 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponent {
public abstract void invoke(StreamRecord record, StreamCollector<OUT> collector)
public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception;
}
......@@ -46,6 +46,7 @@ import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.StreamCollector2;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -62,6 +63,7 @@ import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector;
public final class StreamComponentHelper<T extends AbstractInvokable> {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
......@@ -75,9 +77,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private TupleSerializer<Tuple> outTupleSerializer = null;
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public StreamCollector<Tuple> collector;
public Collector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private List<Integer> numOfOutputs_f = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
......@@ -112,14 +115,17 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public StreamCollector<Tuple> setCollector(Configuration taskConfiguration, int id,
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollector2<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
return collector;
}
......@@ -340,6 +346,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchsizes_f.add(batchSize);
numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1));
// TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
......
......@@ -69,9 +69,9 @@ public class BatchReduceTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000);
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource())
.batchReduce(new MyBatchReduce()).addSink(new MySink());
.batch(4).batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute();
}
......
......@@ -15,10 +15,15 @@
package eu.stratosphere.streaming.api;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class StreamCollector2Test {
......@@ -26,14 +31,23 @@ public class StreamCollector2Test {
@Test
public void testCollect() {
int[] batchSizesOfNotPartitioned = new int[] {};
int[] batchSizesOfPartitioned = new int[] {2, 2};
int[] parallelismOfOutput = new int[] {2, 1};
List<Integer> batchSizesOfNotPartitioned = new ArrayList<Integer>();
List<Integer> batchSizesOfPartitioned = new ArrayList<Integer>();
batchSizesOfPartitioned.add(2);
batchSizesOfPartitioned.add(2);
List<Integer> parallelismOfOutput = new ArrayList<Integer>();
parallelismOfOutput.add(2);
parallelismOfOutput.add(2);
int keyPosition = 0;
long batchTimeout = 1000;
int channelID = 1;
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, null);
List<RecordWriter<StreamRecord>> fOut = new ArrayList<RecordWriter<StreamRecord>>();
fOut.add(null);
fOut.add(null);
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册