From 6c2a1b3e93c8230ff90cf754a33d01769fdcf54d Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:20 +0200 Subject: [PATCH] [streaming] refactor and jbuilder update for StreamRecord2 --- .../streaming/api/BatchReduceInvokable.java | 3 ++- .../stratosphere/streaming/api/FilterInvokable.java | 3 ++- .../stratosphere/streaming/api/FlatMapInvokable.java | 3 ++- .../stratosphere/streaming/api/JobGraphBuilder.java | 4 ++++ .../eu/stratosphere/streaming/api/MapInvokable.java | 3 ++- .../eu/stratosphere/streaming/api/SinkInvokable.java | 3 ++- .../api/invokable/DefaultSinkInvokable.java | 4 ++-- .../api/invokable/DefaultTaskInvokable.java | 4 ++-- .../api/invokable/StreamRecordInvokable.java | 4 ++-- .../api/streamcomponent/StreamComponentHelper.java | 12 ++++++++++-- 10 files changed, 30 insertions(+), 13 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java index e5151a6c66b..dc5c24b6649 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/BatchReduceInvokable.java @@ -20,6 +20,7 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class BatchReduceInvokable extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -30,7 +31,7 @@ public class BatchReduceInvokable extends U } @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { Iterator iterator = (Iterator) record.getBatchIterable().iterator(); reducer.reduce(iterator, collector); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FilterInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FilterInvokable.java index 63e51153e07..c821845cd1b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FilterInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FilterInvokable.java @@ -18,6 +18,7 @@ import eu.stratosphere.api.java.functions.FilterFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class FilterInvokable extends UserTaskInvokable { FilterFunction filterFunction; @@ -27,7 +28,7 @@ public class FilterInvokable extends UserTaskInvokable } @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { for (int i = 0; i < record.getBatchSize(); i++) { IN tuple = (IN) record.getTuple(i); if (filterFunction.filter(tuple)) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java index 619ac0bd07a..4a170e0a5f7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FlatMapInvokable.java @@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class FlatMapInvokable extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -29,7 +30,7 @@ public class FlatMapInvokable extends UserTask } @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { int batchSize = record.getBatchSize(); for (int i = 0; i < batchSize; i++) { @SuppressWarnings("unchecked") diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 556b20ede76..b22642529c7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -460,6 +460,10 @@ public class JobGraphBuilder { "partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition); + config.setInteger( + "numOfOutputs_" + + (upStreamComponent.getNumberOfForwardConnections() - 1), numberOfInstances.get(downStreamComponentName)); + addOutputChannels(upStreamComponentName, 1); if (log.isDebugEnabled()) { log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> " diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java index 544423d5758..8831cd3493a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/MapInvokable.java @@ -19,6 +19,7 @@ import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class MapInvokable extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -29,7 +30,7 @@ public class MapInvokable extends UserTaskInvo } @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { int batchSize = record.getBatchSize(); for (int i = 0; i < batchSize; i++) { @SuppressWarnings("unchecked") diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java index 3260711fe3c..9f8245a10e3 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkInvokable.java @@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class SinkInvokable extends UserSinkInvokable { private static final long serialVersionUID = 1L; @@ -29,7 +30,7 @@ public class SinkInvokable extends UserSinkInvokable { } @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { int batchSize = record.getBatchSize(); for (int i = 0; i < batchSize; i++) { @SuppressWarnings("unchecked") diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultSinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultSinkInvokable.java index cb4198b63b2..2eb7b1f77a2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultSinkInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultSinkInvokable.java @@ -16,15 +16,15 @@ package eu.stratosphere.streaming.api.invokable; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class DefaultSinkInvokable extends UserSinkInvokable { private static final long serialVersionUID = 1L; @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { String value = (String) record.getTuple(0).getField(0); System.out.println(value); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultTaskInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultTaskInvokable.java index ee0bf1e7882..b11f21db42c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultTaskInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/DefaultTaskInvokable.java @@ -16,15 +16,15 @@ package eu.stratosphere.streaming.api.invokable; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public class DefaultTaskInvokable extends UserTaskInvokable { private static final long serialVersionUID = 1L; @Override - public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + public void invoke(StreamRecord record, Collector collector) throws Exception { // TODO Auto-generated method stub } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamRecordInvokable.java index f1aba9008f9..05a8d6ddca4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamRecordInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamRecordInvokable.java @@ -16,11 +16,11 @@ package eu.stratosphere.streaming.api.invokable; import eu.stratosphere.api.java.tuple.Tuple; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.util.Collector; public abstract class StreamRecordInvokable extends StreamComponent { - public abstract void invoke(StreamRecord record, StreamCollector collector) + public abstract void invoke(StreamRecord record, Collector collector) throws Exception; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index a7030904b03..11624ca00c4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -46,6 +46,7 @@ import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.streaming.api.SinkFunction; import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.StreamCollector2; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; @@ -62,6 +63,7 @@ import eu.stratosphere.streaming.faulttolerance.FailEventListener; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner; +import eu.stratosphere.util.Collector; public final class StreamComponentHelper { private static final Log log = LogFactory.getLog(StreamComponentHelper.class); @@ -75,9 +77,10 @@ public final class StreamComponentHelper { private TupleSerializer outTupleSerializer = null; private SerializationDelegate outSerializationDelegate = null; - public StreamCollector collector; + public Collector collector; private List batchsizes_s = new ArrayList(); private List batchsizes_f = new ArrayList(); + private List numOfOutputs_f = new ArrayList(); private int keyPosition = 0; private List> outputs_s = new ArrayList>(); @@ -112,7 +115,7 @@ public final class StreamComponentHelper { } - public StreamCollector setCollector(Configuration taskConfiguration, int id, + public Collector setCollector(Configuration taskConfiguration, int id, List> outputs) { int batchSize = taskConfiguration.getInteger("batchSize", 1); @@ -120,6 +123,10 @@ public final class StreamComponentHelper { long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); collector = new StreamCollector(batchSize, batchTimeout, id, outSerializationDelegate, outputs); + + // collector = new StreamCollector2(batchsizes_s.toArray(), + // batchsizes_f.toArray(),numOfOutputs_f , keyPosition, batchTimeout, + // id, outSerializationDelegate, outputs); return collector; } @@ -340,6 +347,7 @@ public final class StreamComponentHelper { try { if (partitioner.equals(FieldsPartitioner.class)) { batchsizes_f.add(batchSize); + numOfOutputs_f.add(taskConfiguration.getInteger("numOfOutputs_" + nrOutput, -1)); // TODO:force one partitioning field keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1); -- GitLab