diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..19cd70fb4e0efbe46ea6e2435b6075176b7ef38f --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java @@ -0,0 +1,53 @@ +package eu.stratosphere.streaming.api; + +import java.util.List; +import java.util.Map; + +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.event.task.EventListener; +import eu.stratosphere.nephele.io.ChannelSelector; +import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.streaming.partitioner.DefaultPartitioner; +import eu.stratosphere.streaming.partitioner.FieldsPartitioner; +import eu.stratosphere.types.Key; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; + +public final class StreamComponentFactory { + //TODO: put setConfigInputs here + + public static void setAckListener(Map recordBuffer, + String sourceInstanceID, List> outputs) { + EventListener eventListener = new AckEventListener(sourceInstanceID, + recordBuffer); + for (RecordWriter output : outputs) { + // TODO: separate outputs + output.subscribeToEvent(eventListener, AckEvent.class); + } + } + + public static void setPartitioner(Configuration taskConfiguration, + int nrOutput, List> partitioners) { + Class> partitioner = taskConfiguration + .getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class, + ChannelSelector.class); + + try { + if (partitioner.equals(FieldsPartitioner.class)) { + int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + + nrOutput, 1); + Class keyClass = taskConfiguration.getClass( + "partitionerClassParam_" + nrOutput, StringValue.class, Key.class); + + partitioners.add(partitioner.getConstructor(int.class, Class.class) + .newInstance(keyPosition, keyClass)); + + } else { + partitioners.add(partitioner.newInstance()); + } + } catch (Exception e) { + System.out.println("partitioner error" + " " + "partitioner_" + nrOutput); + System.out.println(e); + } + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java index 70988cda17dbc238ed98b5cfc683a296cb37c8b1..1a91f6dc4c7cb584b96bb0aa4b86b1ab4574bf13 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java @@ -21,18 +21,13 @@ import java.util.Map; import java.util.TreeMap; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; -import eu.stratosphere.streaming.partitioner.DefaultPartitioner; -import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.test.RandIS; -import eu.stratosphere.types.Key; import eu.stratosphere.types.Record; -import eu.stratosphere.types.StringValue; public class StreamSource extends AbstractInputTask { @@ -72,7 +67,7 @@ public class StreamSource extends AbstractInputTask { numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0); for (int i = 1; i <= numberOfOutputs; i++) { - setPartitioner(taskConfiguration, i); + StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners); } for (ChannelSelector outputPartitioner : partitioners) { @@ -81,63 +76,24 @@ public class StreamSource extends AbstractInputTask { } setUserFunction(taskConfiguration); - setAckListener(); - + StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID, outputs); } public void setUserFunction(Configuration taskConfiguration) { - Class userFunctionClass = taskConfiguration .getClass("userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class); - try { this.userFunction = userFunctionClass.newInstance(); this.userFunction.declareOutputs(outputs, sourceInstanceID, recordBuffer); } catch (Exception e) { } - } - - private void setPartitioner(Configuration taskConfiguration, int nrOutput) { - Class> partitioner = taskConfiguration - .getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class, - ChannelSelector.class); - - try { - if (partitioner.equals(FieldsPartitioner.class)) { - int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" - + nrOutput, 1); - Class keyClass = taskConfiguration.getClass( - "partitionerClassParam_" + nrOutput, StringValue.class, Key.class); - - partitioners.add(partitioner.getConstructor(int.class, Class.class) - .newInstance(keyPosition, keyClass)); - - } else { - partitioners.add(partitioner.newInstance()); - } - } catch (Exception e) { - System.out.println("partitioner error" + " " + "partitioner_" + nrOutput); - System.out.println(e); - } - - } - - public void setAckListener() { - EventListener eventListener = new AckEventListener(sourceInstanceID, - recordBuffer); - for (RecordWriter output : outputs) { - // TODO: separate outputs - output.subscribeToEvent(eventListener, AckEvent.class); - } - } - + @Override public void registerInputOutput() { setConfigInputs(); - } @Override diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java index e546d4ec1f29a64559b6ef08fdf23e8e00f11e57..9c58233dbae7f5828bea06b32bf0e26ddb6bdb47 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java @@ -21,18 +21,13 @@ import java.util.Map; import java.util.TreeMap; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; -import eu.stratosphere.streaming.partitioner.DefaultPartitioner; -import eu.stratosphere.streaming.partitioner.FieldsPartitioner; -import eu.stratosphere.types.Key; import eu.stratosphere.types.Record; -import eu.stratosphere.types.StringValue; //TODO: Refactor, create common ancestor with StreamSource public class StreamTask extends AbstractTask { @@ -43,7 +38,7 @@ public class StreamTask extends AbstractTask { private UserTaskInvokable userFunction; private int numberOfInputs; private int numberOfOutputs; - + private static int numTasks = 0; private String taskInstanceID = ""; private Map recordBuffer; @@ -62,7 +57,6 @@ public class StreamTask extends AbstractTask { } private void setConfigInputs() { - Configuration taskConfiguration = getTaskConfiguration(); numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); @@ -73,7 +67,7 @@ public class StreamTask extends AbstractTask { numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0); for (int i = 1; i <= numberOfOutputs; i++) { - setPartitioner(taskConfiguration, i); + StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners); } for (ChannelSelector outputPartitioner : partitioners) { @@ -82,20 +76,10 @@ public class StreamTask extends AbstractTask { } setUserFunction(taskConfiguration); - setAckListener(); - } - - public void setAckListener() { - EventListener eventListener = new AckEventListener(taskInstanceID, - recordBuffer); - for (RecordWriter output : outputs) { - // TODO: separate outputs - output.subscribeToEvent(eventListener, AckEvent.class); - } + StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID, outputs); } public void setUserFunction(Configuration taskConfiguration) { - Class userFunctionClass = taskConfiguration .getClass("userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class); @@ -105,32 +89,6 @@ public class StreamTask extends AbstractTask { } catch (Exception e) { } - - } - - private void setPartitioner(Configuration taskConfiguration, int nrOutput) { - Class> partitioner = taskConfiguration - .getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class, - ChannelSelector.class); - - try { - if (partitioner.equals(FieldsPartitioner.class)) { - int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" - + nrOutput, 1); - Class keyClass = taskConfiguration.getClass( - "partitionerClassParam_" + nrOutput, StringValue.class, Key.class); - - partitioners.add(partitioner.getConstructor(int.class, Class.class) - .newInstance(keyPosition, keyClass)); - - } else { - partitioners.add(partitioner.newInstance()); - } - } catch (Exception e) { - System.out.println("partitioner error" + " " + "partitioner_" + nrOutput); - System.out.println(e); - } - } @Override @@ -148,7 +106,7 @@ public class StreamTask extends AbstractTask { hasInput = true; StreamRecord streamRecord = new StreamRecord(input.next()); String id = streamRecord.popId(); - //TODO: Enclose invoke in try-catch to properly fail records + // TODO: Enclose invoke in try-catch to properly fail records userFunction.invoke(streamRecord.getRecord()); System.out.println(this.getClass().getName() + "-" + taskInstanceID); System.out.println(recordBuffer.toString());