提交 cb205d3a 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] StreamCollector2 added to streamcomponenthelper with fix

上级 6c2a1b3e
......@@ -53,6 +53,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
// TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
//TODO: move copy to StreamCollector2
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
......
......@@ -28,45 +28,50 @@ public class StreamCollector2<T extends Tuple> implements Collector<T> {
ArrayList<StreamCollector<Tuple>> notPartitionedCollectors;
ArrayList<StreamCollector<Tuple>[]> partitionedCollectors;
List<RecordWriter<StreamRecord>> partitionedOutputs;
List<RecordWriter<StreamRecord>> notPartitionedOutputs;
int keyPostition;
// TODO consider channelID
public StreamCollector2(int[] batchSizesOfNotPartitioned, int[] batchSizesOfPartitioned,
int[] parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
public StreamCollector2(List<Integer> batchSizesOfNotPartitioned, List<Integer> batchSizesOfPartitioned,
List<Integer> parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
List<RecordWriter<StreamRecord>> partitionedOutputs,
List<RecordWriter<StreamRecord>> notPartitionedOutputs) {
notPartitionedCollectors = new ArrayList<StreamCollector<Tuple>>(
batchSizesOfNotPartitioned.length);
batchSizesOfNotPartitioned.size());
partitionedCollectors = new ArrayList<StreamCollector<Tuple>[]>(
batchSizesOfPartitioned.length);
batchSizesOfPartitioned.size());
this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.length; i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs));
for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, notPartitionedOutputs));
}
for (int i = 0; i < batchSizesOfPartitioned.length; i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput[i]];
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) {
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs);
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(partitionedOutputs.get(i));
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, output);
}
partitionedCollectors.add(collectors);
}
}
// TODO copy here instead of copying inside every StreamCollector
@Override
public void collect(T record) {
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(record);
}
int partitionHash = Math.abs(record.getField(keyPostition).hashCode());
for (StreamCollector<Tuple>[] collectors : partitionedCollectors) {
collectors[partitionHash % collectors.length].collect(record);
}
......
......@@ -121,12 +121,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
// collector = new StreamCollector2<Tuple>(batchsizes_s.toArray(),
// batchsizes_f.toArray(),numOfOutputs_f , keyPosition, batchTimeout,
// id, outSerializationDelegate, outputs);
collector = new StreamCollector2<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
return collector;
}
......
......@@ -69,9 +69,9 @@ public class BatchReduceTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000);
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource())
.batchReduce(new MyBatchReduce()).addSink(new MySink());
.batch(4).batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute();
}
......
......@@ -15,10 +15,15 @@
package eu.stratosphere.streaming.api;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class StreamCollector2Test {
......@@ -26,14 +31,23 @@ public class StreamCollector2Test {
@Test
public void testCollect() {
int[] batchSizesOfNotPartitioned = new int[] {};
int[] batchSizesOfPartitioned = new int[] {2, 2};
int[] parallelismOfOutput = new int[] {2, 1};
List<Integer> batchSizesOfNotPartitioned = new ArrayList<Integer>();
List<Integer> batchSizesOfPartitioned = new ArrayList<Integer>();
batchSizesOfPartitioned.add(2);
batchSizesOfPartitioned.add(2);
List<Integer> parallelismOfOutput = new ArrayList<Integer>();
parallelismOfOutput.add(2);
parallelismOfOutput.add(2);
int keyPosition = 0;
long batchTimeout = 1000;
int channelID = 1;
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, null);
List<RecordWriter<StreamRecord>> fOut = new ArrayList<RecordWriter<StreamRecord>>();
fOut.add(null);
fOut.add(null);
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册