提交 8f77a938 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] streamcollector refactor

上级 c3fe7358
......@@ -15,8 +15,6 @@
package eu.stratosphere.streaming.api;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
......@@ -32,17 +30,16 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
protected int counter = 0;
protected int channelID;
private long timeOfLastRecordEmitted = System.currentTimeMillis();;
private List<RecordWriter<StreamRecord>> outputs;
private RecordWriter<StreamRecord> output;
public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
SerializationDelegate<Tuple> serializationDelegate, RecordWriter<StreamRecord> output) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.streamRecord = new ArrayStreamRecord(batchSize);
this.streamRecord.setSeralizationDelegate(serializationDelegate);
this.channelID = channelID;
this.outputs = outputs;
this.output = output;
}
public StreamCollector(int batchSize, long batchTimeout, int channelID,
......@@ -53,7 +50,6 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
// TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
//TODO: move copy to StreamCollector2
streamRecord.setTuple(counter, tuple);
counter++;
......@@ -77,19 +73,14 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
counter = 0;
streamRecord.setId(channelID);
if (outputs == null) {
System.out.println(streamRecord);
} else {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(streamRecord);
output.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
}
}
try {
output.emit(streamRecord);
output.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
}
}
@Override
......
......@@ -33,8 +33,9 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
int keyPostition;
// TODO consider channelID
public StreamCollectorManager(List<Integer> batchSizesOfNotPartitioned, List<Integer> batchSizesOfPartitioned,
List<Integer> parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
public StreamCollectorManager(List<Integer> batchSizesOfNotPartitioned,
List<Integer> batchSizesOfPartitioned, List<Integer> parallelismOfOutput,
int keyPosition, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> partitionedOutputs,
List<RecordWriter<StreamRecord>> notPartitionedOutputs) {
......@@ -47,19 +48,16 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(notPartitionedOutputs.get(i));
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, output));
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned
.get(i), batchTimeout, channelID, serializationDelegate, notPartitionedOutputs
.get(i)));
}
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) {
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(partitionedOutputs.get(i));
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, output);
batchTimeout, channelID, serializationDelegate, partitionedOutputs.get(i));
}
partitionedCollectors.add(collectors);
}
......@@ -69,7 +67,7 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
@Override
public void collect(T tuple) {
T copiedTuple = StreamRecord.copyTuple(tuple);
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(copiedTuple);
}
......
......@@ -21,13 +21,10 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
......
......@@ -15,19 +15,21 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
......@@ -37,6 +39,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.MapTest.MyMap;
import eu.stratosphere.streaming.api.MapTest.MySink;
import eu.stratosphere.streaming.api.PrintTest.MyFlatMap;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -47,57 +50,193 @@ public class FlatMapTest {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0*value.f0));
public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0 * value.f0));
}
}
public static final class ParallelFlatMap extends
FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
numberOfElements++;
}
}
public static final class GenerateSequenceFlatMap extends
FlatMapFunction<Tuple1<Long>, Tuple1<Long>> {
@Override
public void flatMap(Tuple1<Long> value, Collector<Tuple1<Long>> out) throws Exception {
out.collect(new Tuple1<Long>(value.f0 * value.f0));
}
}
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
System.out.println("result " + tuple.f0);
}
}
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
public static final class FromElementsSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
public void invoke(Tuple1<Integer> tuple) {
fromElementsResult.add(tuple.f0);
}
}
public static final class FromCollectionSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
fromCollectionResult.add(tuple.f0);
}
}
public static final class GenerateSequenceSink extends SinkFunction<Tuple1<Long>> {
@Override
public void invoke(Tuple1<Long> tuple) {
generateSequenceResult.add(tuple.f0);
}
}
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
expected.add(i * i);
}
}
private static void fillFromElementsExpected() {
fromElementsExpected.add(4);
fromElementsExpected.add(25);
fromElementsExpected.add(81);
}
private static void fillSequenceSet() {
for (int i = 0; i < 10; i++) {
sequenceExpected.add(i * i);
}
}
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
private static void fillLongSequenceSet() {
for (int i = 0; i < 10; i++) {
sequenceLongExpected.add((long)(i * i));
}
}
private static void fillFromCollectionSet() {
if(fromCollectionSet.isEmpty()){
for (int i = 0; i < 10; i++) {
fromCollectionSet.add(i);
}
}
}
private static final int PARALELISM = 1;
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static int numberOfElements = 0;
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
private static Set<Integer> fromElementsExpected = new HashSet<Integer>();
private static Set<Integer> fromElementsResult = new HashSet<Integer>();
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static Set<Integer> sequenceExpected = new HashSet<Integer>();
private static Set<Long> sequenceLongExpected = new HashSet<Long>();
private static Set<Integer> fromCollectionResult = new HashSet<Integer>();
private static Set<Long> generateSequenceResult = new HashSet<Long>();
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment(2, 1000);
DataStream<Tuple1<Integer>> dataStream = env.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
.flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
env.execute();
fillExpectedList();
assertTrue(expected.equals(result));
}
@Test
public void parallelShuffleconnectTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> source = env.fromCollection(fromCollectionSet);
DataStream<Tuple1<Integer>> map = source.flatMap(new ParallelFlatMap(), 1).addSink(
new MySink());
DataStream<Tuple1<Integer>> map2 = source.flatMap(new ParallelFlatMap(), 1).addSink(
new MySink());
env.execute();
assertEquals(20, numberOfElements);
numberOfElements=0;
}
@Test
public void fromElementsTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> map = env.fromElements(2, 5, 9).flatMap(new MyFlatMap(), 1);
DataStream<Tuple1<Integer>> sink = map.addSink(new FromElementsSink());
fillFromElementsExpected();
env.execute();
assertEquals(fromElementsExpected, fromElementsResult);
}
@Test
public void fromCollectionTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> map = env.fromCollection(fromCollectionSet).flatMap(
new MyFlatMap(), 1);
DataStream<Tuple1<Integer>> sink = map.addSink(new FromCollectionSink());
fillSequenceSet();
env.execute();
assertEquals(sequenceExpected, fromCollectionResult);
}
@Test
public void generateSequenceTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Long>> map = env.generateSequence(0, 9).flatMap(
new GenerateSequenceFlatMap(), 1);
DataStream<Tuple1<Long>> sink = map.addSink(new GenerateSequenceSink());
fillLongSequenceSet();
env.execute();
assertEquals(sequenceLongExpected, generateSequenceResult);
}
}
......@@ -18,7 +18,9 @@ package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
......@@ -33,27 +35,6 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("source "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MyFieldsSource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < MAXSOURCE; i++) {
collector.collect(new Tuple1<Integer>(5));
}
}
}
public static final class MyDiffFieldsSource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 9; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -63,7 +44,6 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
System.out.println("mymap "+map);
map++;
return new Tuple1<Integer>(value.f0 * value.f0);
}
......@@ -76,7 +56,6 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
counter++;
if (counter == MAXSOURCE)
allInOne = true;
return new Tuple1<Integer>(value.f0 * value.f0);
......@@ -140,13 +119,12 @@ public class MapTest {
@Override
public void invoke(Tuple1<Integer> tuple) {
System.out.println("sink "+graphResult);
graphResult++;
}
}
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
private static int broadcastResult = 0;
private static int shuffleResult = 0;
private static int fieldsResult = 0;
......@@ -157,19 +135,49 @@ public class MapTest {
private static final int MAXSOURCE = 10;
private static boolean allInOne = false;
private static boolean threeInAll = true;
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static List<Integer> fromCollectionFields = new ArrayList<Integer>();
private static Set<Integer> fromCollectionDiffFieldsSet = new HashSet<Integer>();
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
expected.add(i * i);
}
}
private static void fillFromCollectionSet() {
if(fromCollectionSet.isEmpty()){
for (int i = 0; i < 10; i++) {
fromCollectionSet.add(i);
}
}
}
private static void fillFromCollectionFieldsSet() {
if(fromCollectionFields.isEmpty()){
for (int i = 0; i < MAXSOURCE; i++) {
fromCollectionFields.add(5);
}
}
}
private static void fillFromCollectionDiffFieldsSet() {
if(fromCollectionDiffFieldsSet.isEmpty()){
for (int i = 0; i < 9; i++) {
fromCollectionDiffFieldsSet.add(i);
}
}
}
@Test
public void mapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> dataStream = env.addSource(new MySource(), 1)
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
.map(new MyMap(), PARALELISM).addSink(new MySink());
env.execute();
......@@ -182,8 +190,11 @@ public class MapTest {
@Test
public void broadcastSinkTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MySource(), 1)
.fromCollection(fromCollectionSet)
.broadcast()
.map(new MyMap(), 3)
.addSink(new MyBroadcastSink());
......@@ -196,8 +207,11 @@ public class MapTest {
@Test
public void shuffleSinkTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MySource(), 1)
.fromCollection(fromCollectionSet)
.map(new MyMap(), 3)
.addSink(new MyShufflesSink());
env.execute();
......@@ -222,8 +236,11 @@ public class MapTest {
@Test
public void fieldsMapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionFieldsSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MyFieldsSource(), 1)
.fromCollection(fromCollectionFields)
.partitionBy(0)
.map(new MyFieldsMap(), 3)
.addSink(new MyFieldsSink());
......@@ -236,8 +253,11 @@ public class MapTest {
@Test
public void diffFieldsMapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionDiffFieldsSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MyDiffFieldsSource(), 1)
.fromCollection(fromCollectionDiffFieldsSet)
.partitionBy(0)
.map(new MyDiffFieldsMap(), 3)
.addSink(new MyDiffFieldsSink());
......
......@@ -65,11 +65,8 @@ public class StreamCollectorTest {
@Test
public void recordWriter() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
ArrayList<RecordWriter<StreamRecord>> rwList = new ArrayList<RecordWriter<StreamRecord>>();
rwList.add(recWriter);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册