diff --git a/flink-addons/flink-streaming/.travis.yml b/flink-addons/flink-streaming/.travis.yml
deleted file mode 100644
index 99bb65870379a1f836baa3607d951b128f891885..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/.travis.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-
-language: java
-jdk:
- - oraclejdk8
- - openjdk7
- - openjdk6
-
-
-
-script: "mvn -B clean verify"
-
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index 3fa883574e5ee017fc9436b60be9733a49accbb8..1dc5138828387d6057b5f838a3828ca01246752a 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -103,6 +103,20 @@
-Xmx1024m
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.16
+
+ -Xmx1024m
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.6
+
org.apache.maven.plugins
maven-eclipse-plugin
@@ -117,6 +131,28 @@
true
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce-maven
+
+ enforce
+
+
+
+
+ (,2.1.0),(2.1.0,2.2.0),(2.2.0,)
+ Maven 2.1.0 and 2.2.0 produce incorrect GPG signatures and checksums respectively.
+
+
+
+
+
+
org.apache.maven.plugins
maven-source-plugin
@@ -158,6 +194,18 @@
-Xmx1024m
+
+ maven-failsafe-plugin
+ 2.7
+
+
+ WARN
+
+ always
+ 1
+ false
+
+
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
index 86ee5a134414df4f1db11c2a4d1888af7779ca5b..f3422c28dfdbcfe0ed1dd9bf8b6db948712007d7 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
@@ -1,14 +1,16 @@
package eu.stratosphere.streaming.api;
+import java.util.Map;
+
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class AckEventListener implements EventListener {
private String taskInstanceID;
- private FaultTolerancyBuffer recordBuffer;
+ private Map recordBuffer;
- public AckEventListener(String taskInstanceID,FaultTolerancyBuffer recordBuffer) {
+ public AckEventListener(String taskInstanceID,Map recordBuffer) {
this.taskInstanceID=taskInstanceID;
this.recordBuffer=recordBuffer;
}
@@ -19,8 +21,8 @@ public class AckEventListener implements EventListener {
String ackCID = recordId.split("-", 2)[0];
if (ackCID.equals(taskInstanceID)) {
System.out.println("Ack recieved " + ackEvent.getRecordId());
- recordBuffer.ackRecord(ackEvent.getRecordId());
- System.out.println(recordBuffer.getRecordBuffer());
+ recordBuffer.remove(ackEvent.getRecordId());
+ System.out.println(recordBuffer);
System.out.println("---------------------");
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEvent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEvent.java
deleted file mode 100644
index 6a5c2c4f6fa4e55e4d50c74922dedb03e0345259..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package eu.stratosphere.streaming.api;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-
-public class FailEvent extends AbstractTaskEvent {
- private String recordId;
-
- public FailEvent(String recordId) {
- setRecordId(recordId);
- System.out.println("Fail sent " + recordId);
- System.out.println("---------------------");
-
- }
-
- @Override
- public void write(DataOutput out) throws IOException {}
-
- @Override
- public void read(DataInput in) throws IOException {}
-
- public void setRecordId(String recordId) {
- this.recordId = recordId;
- }
- public String getRecordId() {
- return this.recordId;
- }
-}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEventListener.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEventListener.java
deleted file mode 100644
index 3820d59b46db65726c154a866fe1852eb3cf59a5..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FailEventListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package eu.stratosphere.streaming.api;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-
-public class FailEventListener implements EventListener {
-
- private String taskInstanceID;
- private FaultTolerancyBuffer recordBuffer;
-
- public FailEventListener(String taskInstanceID,
- FaultTolerancyBuffer recordBuffer) {
- this.taskInstanceID = taskInstanceID;
- this.recordBuffer = recordBuffer;
- }
-
- public void eventOccurred(AbstractTaskEvent event) {
- FailEvent failEvent = (FailEvent) event;
- String recordId = failEvent.getRecordId();
- String failCID = recordId.split("-", 2)[0];
- if (failCID.equals(taskInstanceID)) {
- System.out.println("Fail recieved " + recordId);
- recordBuffer.failRecord(recordId);
- System.out.println(recordBuffer.getRecordBuffer());
- System.out.println("---------------------");
-
- }
-
- }
-}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java
deleted file mode 100644
index e9c7881a3c6cc8354730d1836addc89612cbc346..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package eu.stratosphere.streaming.api;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.types.Record;
-
-public class FaultTolerancyBuffer {
-
- private Map recordBuffer;
- private Map ackCounter;
- private List> outputs;
- private String channelID;
-
- private int numberOfOutputs;
-
- public FaultTolerancyBuffer(List> outputs, String channelID) {
- this.outputs=outputs;
- this.recordBuffer = new HashMap();
- this.ackCounter = new HashMap();
- this.numberOfOutputs = outputs.size();
- this.channelID=channelID;
-
- }
-
- public void addRecord(StreamRecord streamRecord) {
-
- recordBuffer.put(streamRecord.getId(), streamRecord);
- ackCounter.put(streamRecord.getId(), numberOfOutputs);
- }
-
- public Record popRecord(String recordID) {
- Record record = recordBuffer.get(recordID).getRecord();
- recordBuffer.remove(recordID);
- ackCounter.remove(recordID);
- return record;
- }
-
- public void ackRecord(String recordID) {
-
- if (ackCounter.containsKey(recordID)) {
- int ackCount = ackCounter.get(recordID) - 1;
- ackCounter.put(recordID, ackCount);
-
- if (ackCount == 0) {
- recordBuffer.remove(recordID);
- ackCounter.remove(recordID);
- }
- }
-
- }
-
- public void failRecord(String recordID) {
- // Create new id to avoid double counting acks
- StreamRecord newRecord = new StreamRecord(popRecord(recordID),channelID).addId();
- addRecord(newRecord);
- reEmit(newRecord.getRecordWithId());
-
- }
-
- public void reEmit(Record record) {
- for (RecordWriter output : outputs) {
- try {
- output.emit(record);
- } catch (Exception e) {
- System.out.println("Re-emit failed");
- }
- }
-
- }
-
- public Map getRecordBuffer() {
- return this.recordBuffer;
- }
-
-}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
index a94cd32b566d5088a92353588ac158f5570a25ab..1135ff7a02155860abbb2fe70e9ce10f6858aee8 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
@@ -31,9 +31,6 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
-import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
-import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SerializableStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SerializableStreamRecord.java
new file mode 100644
index 0000000000000000000000000000000000000000..d0d7ba7de711245f1a546e4ceaa61d791a66c3aa
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SerializableStreamRecord.java
@@ -0,0 +1,84 @@
+package eu.stratosphere.streaming.api;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Random;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.types.Value;
+
+public class SerializableStreamRecord implements IOReadableWritable, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private Value[] fields;
+ private StringValue uid = new StringValue("");
+ private String channelID = "";
+ private int numOfFields;
+
+ public SerializableStreamRecord(int length) {
+ this.numOfFields = length;
+ fields = new Value[length];
+ }
+
+ public SerializableStreamRecord(int length, String channelID) {
+ this(length);
+ this.channelID = channelID;
+ }
+
+ public int getNumOfFields() {
+ return numOfFields;
+ }
+
+ public SerializableStreamRecord setId() {
+ Random rnd = new Random();
+ uid.setValue(channelID + "-" + rnd.nextInt(1000));
+ return this;
+ }
+
+ public String getId() {
+ return uid.getValue();
+ }
+
+ public Value getField(int fieldNumber) {
+ return fields[fieldNumber];
+ }
+
+ public void setField(int fieldNumber, Value value) {
+ fields[fieldNumber] = value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ uid.write(out);
+
+ // Write the number of fields with an IntValue
+ (new IntValue(numOfFields)).write(out);
+
+ // Write the fields
+ for (int i = 0; i < numOfFields; i++) {
+ fields[i].write(out);
+ }
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ uid.read(in);
+
+ // Get the number of fields
+ IntValue numOfFieldsValue = new IntValue(0);
+ numOfFieldsValue.read(in);
+ numOfFields = numOfFieldsValue.getValue();
+
+ // Make sure the fields have numOfFields elements
+ fields = new Value[numOfFields];
+
+ // Read the fields
+ for (int i = 0; i < numOfFields; i++) {
+ fields[i].read(in);
+ }
+ }
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..3f932ec51d71d6ac3da7b18fd5e80ece48782ac8
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java
@@ -0,0 +1,115 @@
+package eu.stratosphere.streaming.api;
+
+import java.util.List;
+import java.util.Map;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.nephele.event.task.EventListener;
+import eu.stratosphere.nephele.io.ChannelSelector;
+import eu.stratosphere.nephele.io.RecordReader;
+import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
+import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
+import eu.stratosphere.types.Key;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+
+public final class StreamComponentFactory {
+
+ public static void setAckListener(Map recordBuffer,
+ String sourceInstanceID, List> outputs) {
+ EventListener eventListener = new AckEventListener(sourceInstanceID,
+ recordBuffer);
+ for (RecordWriter output : outputs) {
+ // TODO: separate outputs
+ output.subscribeToEvent(eventListener, AckEvent.class);
+ }
+ }
+
+ // for StreamTask
+ public static int setConfigInputs(StreamTask taskBase,
+ Configuration taskConfiguration, List> inputs) {
+ int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
+ for (int i = 0; i < numberOfInputs; i++) {
+ inputs.add(new RecordReader(taskBase, Record.class));
+ }
+ return numberOfInputs;
+ }
+
+ // this function can be removed as duplication of the above function if
+ // modification on kernel is allowed.
+ // for StreamSink
+ public static int setConfigInputs(StreamSink taskBase,
+ Configuration taskConfiguration, List> inputs) {
+ int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
+ for (int i = 0; i < numberOfInputs; i++) {
+ inputs.add(new RecordReader(taskBase, Record.class));
+ }
+ return numberOfInputs;
+ }
+
+ //for StreamTask
+ public static int setConfigOutputs(StreamTask taskBase,
+ Configuration taskConfiguration,
+ List> outputs,
+ List> partitioners) {
+ int numberOfOutputs = taskConfiguration
+ .getInteger("numberOfOutputs", 0);
+ for (int i = 1; i <= numberOfOutputs; i++) {
+ StreamComponentFactory.setPartitioner(taskConfiguration, i,
+ partitioners);
+ }
+ for (ChannelSelector outputPartitioner : partitioners) {
+ outputs.add(new RecordWriter(taskBase, Record.class,
+ outputPartitioner));
+ }
+ return numberOfOutputs;
+ }
+
+ // this function can be removed as duplication of the above function if
+ // modification on kernel is allowed.
+ // for StreamSource
+ public static int setConfigOutputs(StreamSource taskBase,
+ Configuration taskConfiguration,
+ List> outputs,
+ List> partitioners) {
+ int numberOfOutputs = taskConfiguration
+ .getInteger("numberOfOutputs", 0);
+ for (int i = 1; i <= numberOfOutputs; i++) {
+ StreamComponentFactory.setPartitioner(taskConfiguration, i,
+ partitioners);
+ }
+ for (ChannelSelector outputPartitioner : partitioners) {
+ outputs.add(new RecordWriter(taskBase, Record.class,
+ outputPartitioner));
+ }
+ return numberOfOutputs;
+ }
+
+ public static void setPartitioner(Configuration taskConfiguration,
+ int nrOutput, List> partitioners) {
+ Class extends ChannelSelector> partitioner = taskConfiguration
+ .getClass("partitionerClass_" + nrOutput,
+ DefaultPartitioner.class, ChannelSelector.class);
+
+ try {
+ if (partitioner.equals(FieldsPartitioner.class)) {
+ int keyPosition = taskConfiguration.getInteger(
+ "partitionerIntParam_" + nrOutput, 1);
+ Class extends Key> keyClass = taskConfiguration.getClass(
+ "partitionerClassParam_" + nrOutput, StringValue.class,
+ Key.class);
+
+ partitioners.add(partitioner.getConstructor(int.class,
+ Class.class).newInstance(keyPosition, keyClass));
+
+ } else {
+ partitioners.add(partitioner.newInstance());
+ }
+ } catch (Exception e) {
+ System.out.println("partitioner error" + " " + "partitioner_"
+ + nrOutput);
+ System.out.println(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java
index 1587a1dce5e2927e71001e388ed14deaa8b3e1d2..455ac8b2876dadabb90064a5d41a7362105b4997 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java
@@ -13,7 +13,7 @@ public final class StreamRecord {
private String channelID = "";
public StreamRecord(Record record) {
- this.record = record.createCopy();
+ this.record = record;
}
public StreamRecord(Record record, String channelID) {
@@ -40,13 +40,7 @@ public final class StreamRecord {
}
public Record getRecord() {
- Record newRecord=this.record.createCopy();
- newRecord.removeField(newRecord.getNumFields() - 1);
- return newRecord;
- }
-
- public Record getRecordWithId() {
- return this.record;
+ return record;
}
// TODO:write proper toString
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSink.java
similarity index 75%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSink.java
index 02f32b754205b847805026d3529ccdedc9bc8b73..9a665a57d796c2b739b925eff7d51cc715382081 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSink.java
@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.api.streamcomponent;
+package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
@@ -21,8 +21,7 @@ import java.util.List;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.streaming.api.AckEvent;
-import eu.stratosphere.streaming.api.StreamRecord;
+import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
@@ -30,24 +29,33 @@ public class StreamSink extends AbstractOutputTask {
private List> inputs;
private UserSinkInvokable userFunction;
+ private int numberOfInputs;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList>();
userFunction = null;
+ numberOfInputs = 0;
}
- @Override
- public void registerInputOutput() {
- Configuration taskConfiguration = getTaskConfiguration();
- StreamComponentHelper streamSinkHelper = new StreamComponentHelper();
-
+ public void setUserFunction(Configuration taskConfiguration) {
+ Class extends UserSinkInvokable> userFunctionClass = taskConfiguration
+ .getClass("userfunction", DefaultSinkInvokable.class,
+ UserSinkInvokable.class);
try {
- streamSinkHelper.setConfigInputs(this, taskConfiguration, inputs);
+ userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
- e.printStackTrace();
+
}
- userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
+ }
+
+ @Override
+ public void registerInputOutput() {
+ Configuration taskConfiguration = getTaskConfiguration();
+ // setConfigInputs(taskConfiguration);
+ numberOfInputs = StreamComponentFactory.setConfigInputs(this,
+ taskConfiguration, inputs);
+ setUserFunction(taskConfiguration);
}
@Override
@@ -59,7 +67,7 @@ public class StreamSink extends AbstractOutputTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord rec = new StreamRecord(input.next());
- String id = rec.getId();
+ String id = rec.popId();
userFunction.invoke(rec.getRecord());
input.publishEvent(new AckEvent(id));
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java
similarity index 71%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java
index 1f51d674d7f7b6d692d36ca27423928632f19e06..cd6c6fe67bf5ee0136b31435f51f2d7c5fb5cbf2 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamSource.java
@@ -13,16 +13,18 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.api.streamcomponent;
+package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
+import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Record;
@@ -32,17 +34,21 @@ public class StreamSource extends AbstractInputTask {
private List> outputs;
private List> partitioners;
private UserSourceInvokable userFunction;
+ private int numberOfOutputs;
+
private static int numSources = 0;
private String sourceInstanceID;
- private FaultTolerancyBuffer recordBuffer;
+ private Map recordBuffer;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList>();
partitioners = new LinkedList>();
userFunction = null;
+ numberOfOutputs = 0;
numSources++;
sourceInstanceID = Integer.toString(numSources);
+ recordBuffer = new TreeMap();
}
@@ -56,25 +62,26 @@ public class StreamSource extends AbstractInputTask {
return null;
}
- @Override
- public void registerInputOutput() {
- Configuration taskConfiguration = getTaskConfiguration();
- StreamComponentHelper streamSourceHelper = new StreamComponentHelper();
-
+ public void setUserFunction(Configuration taskConfiguration) {
+ Class extends UserSourceInvokable> userFunctionClass = taskConfiguration
+ .getClass("userfunction", DefaultSourceInvokable.class,
+ UserSourceInvokable.class);
try {
- streamSourceHelper.setConfigOutputs(this, taskConfiguration,
- outputs, partitioners);
+ userFunction = userFunctionClass.newInstance();
+ userFunction
+ .declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
- e.printStackTrace();
+
}
+ }
- recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID);
- userFunction = (UserSourceInvokable) streamSourceHelper
- .getUserFunction(taskConfiguration, outputs, sourceInstanceID,
- recordBuffer);
- streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID,
- outputs);
- streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID,
+ @Override
+ public void registerInputOutput() {
+ Configuration taskConfiguration = getTaskConfiguration();
+ numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
+ taskConfiguration, outputs, partitioners);
+ setUserFunction(taskConfiguration);
+ StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID,
outputs);
}
@@ -84,7 +91,7 @@ public class StreamSource extends AbstractInputTask {
userFunction.invoke();
System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
- System.out.println(recordBuffer.getRecordBuffer());
+ System.out.println(recordBuffer.toString());
System.out.println("---------------------");
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java
similarity index 63%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java
index a18046c23fbb905c08b90f947880468a7da541d1..bfc0b58ebffa4f0f60a6e9e2da2e80d23b75de9f 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java
@@ -13,33 +13,35 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.api.streamcomponent;
+package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.streaming.api.AckEvent;
-import eu.stratosphere.streaming.api.FailEvent;
-import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
-import eu.stratosphere.streaming.api.StreamRecord;
+import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
+//TODO: Refactor, create common ancestor with StreamSource
public class StreamTask extends AbstractTask {
private List> inputs;
private List> outputs;
private List> partitioners;
private UserTaskInvokable userFunction;
+ private int numberOfInputs;
+ private int numberOfOutputs;
+
private static int numTasks = 0;
private String taskInstanceID = "";
-
- private FaultTolerancyBuffer recordBuffer;
+ private Map recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
@@ -47,31 +49,36 @@ public class StreamTask extends AbstractTask {
outputs = new LinkedList>();
partitioners = new LinkedList>();
userFunction = null;
+ numberOfInputs = 0;
+ numberOfOutputs = 0;
numTasks++;
taskInstanceID = Integer.toString(numTasks);
+ recordBuffer = new TreeMap();
+ }
+
+ public void setUserFunction(Configuration taskConfiguration) {
+ Class extends UserTaskInvokable> userFunctionClass = taskConfiguration
+ .getClass("userfunction", DefaultTaskInvokable.class,
+ UserTaskInvokable.class);
+ try {
+ userFunction = userFunctionClass.newInstance();
+ userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer);
+ } catch (Exception e) {
+ }
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
- StreamComponentHelper streamTaskHelper = new StreamComponentHelper();
- try {
- streamTaskHelper.setConfigInputs(this, taskConfiguration, inputs);
- streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs,
- partitioners);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ numberOfInputs = StreamComponentFactory.setConfigInputs(this,
+ taskConfiguration, inputs);
+ numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
+ taskConfiguration, outputs, partitioners);
- recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
- userFunction = (UserTaskInvokable) streamTaskHelper
- .getUserFunction(taskConfiguration, outputs, taskInstanceID,
- recordBuffer);
- streamTaskHelper.setAckListener(recordBuffer, taskInstanceID,
- outputs);
- streamTaskHelper.setFailListener(recordBuffer, taskInstanceID,
+ setUserFunction(taskConfiguration);
+ StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID,
outputs);
}
@@ -84,20 +91,15 @@ public class StreamTask extends AbstractTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
- String id = streamRecord.getId();
+ String id = streamRecord.popId();
// TODO: Enclose invoke in try-catch to properly fail
// records
- try {
- userFunction.invoke(streamRecord.getRecord());
- System.out.println(this.getClass().getName() + "-"
- + taskInstanceID);
- System.out.println(recordBuffer.getRecordBuffer());
- System.out.println("---------------------");
- input.publishEvent(new AckEvent(id));
- } catch (Exception e) {
- input.publishEvent(new FailEvent(id));
- }
-
+ userFunction.invoke(streamRecord.getRecord());
+ System.out.println(this.getClass().getName() + "-"
+ + taskInstanceID);
+ System.out.println(recordBuffer.toString());
+ System.out.println("---------------------");
+ input.publishEvent(new AckEvent(id));
}
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java
index b23f4598d90cebddd6b9524e0c66b51ab4ab4653..3fc68fd214ba6d2c04b047978e465a1d8246812a 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java
@@ -1,8 +1,9 @@
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
+import java.util.Map;
+
import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Record;
@@ -11,10 +12,10 @@ public abstract class StreamInvokable {
private List> outputs;
protected String channelID;
- private FaultTolerancyBuffer emittedRecords;
+ private Map emittedRecords;
public final void declareOutputs(List> outputs,
- String channelID, FaultTolerancyBuffer emittedRecords) {
+ String channelID, Map emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
@@ -23,12 +24,12 @@ public abstract class StreamInvokable {
public final void emit(Record record) {
StreamRecord streamRecord = new StreamRecord(record, channelID).addId();
- emittedRecords.addRecord(streamRecord);
+ emittedRecords.put(streamRecord.getId(), streamRecord);
for (RecordWriter output : outputs) {
try {
- output.emit(streamRecord.getRecordWithId());
+ output.emit(streamRecord.getRecord());
System.out.println(this.getClass().getName());
System.out.println("Emitted " + streamRecord.getId() + "-"
@@ -37,7 +38,6 @@ public abstract class StreamInvokable {
} catch (Exception e) {
System.out.println("Emit error");
- emittedRecords.failRecord(streamRecord.getId());
}
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
deleted file mode 100644
index 1c57d3ee5db6d760fd1f08e7d7f61743ce7f7f07..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package eu.stratosphere.streaming.api.streamcomponent;
-
-import java.util.List;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.streaming.api.AckEvent;
-import eu.stratosphere.streaming.api.AckEventListener;
-import eu.stratosphere.streaming.api.FailEvent;
-import eu.stratosphere.streaming.api.FailEventListener;
-import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
-import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
-import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
-import eu.stratosphere.streaming.api.invokable.StreamInvokable;
-import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
-import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
-import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
-import eu.stratosphere.types.Key;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-
-public final class StreamComponentHelper {
-
- public void setAckListener(FaultTolerancyBuffer recordBuffer,
- String sourceInstanceID, List> outputs) {
- EventListener eventListener = new AckEventListener(sourceInstanceID,
- recordBuffer);
- for (RecordWriter output : outputs) {
- // TODO: separate outputs
- output.subscribeToEvent(eventListener, AckEvent.class);
- }
- }
-
- public void setFailListener(FaultTolerancyBuffer recordBuffer,
- String sourceInstanceID, List> outputs) {
- EventListener eventListener = new FailEventListener(sourceInstanceID,
- recordBuffer);
- for (RecordWriter output : outputs) {
- // TODO: separate outputs
- output.subscribeToEvent(eventListener, FailEvent.class);
- }
- }
-
- public void setConfigInputs(T taskBase, Configuration taskConfiguration,
- List> inputs) throws Exception {
- int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
- for (int i = 0; i < numberOfInputs; i++) {
- if (taskBase instanceof StreamTask) {
- inputs.add(new RecordReader((StreamTask) taskBase,
- Record.class));
- } else if (taskBase instanceof StreamSink) {
- inputs.add(new RecordReader((StreamSink) taskBase,
- Record.class));
- } else {
- throw new Exception(
- "Nonsupported object passed to setConfigInputs");
- }
- }
- }
-
- public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
- List> outputs,
- List> partitioners) throws Exception {
- int numberOfOutputs = taskConfiguration
- .getInteger("numberOfOutputs", 0);
- for (int i = 1; i <= numberOfOutputs; i++) {
- setPartitioner(taskConfiguration, i, partitioners);
- }
- for (ChannelSelector outputPartitioner : partitioners) {
- if (taskBase instanceof StreamTask) {
- outputs.add(new RecordWriter((StreamTask) taskBase,
- Record.class, outputPartitioner));
- } else if (taskBase instanceof StreamSource) {
- outputs.add(new RecordWriter((StreamSource) taskBase,
- Record.class, outputPartitioner));
- } else {
- throw new Exception(
- "Nonsupported object passed to setConfigOutputs");
- }
- }
- }
-
- public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
-
- Class extends UserSinkInvokable> userFunctionClass = taskConfiguration
- .getClass("userfunction", DefaultSinkInvokable.class,
- UserSinkInvokable.class);
- UserSinkInvokable userFunction = null;
-
- try {
- userFunction = userFunctionClass.newInstance();
- } catch (Exception e) {
-
- }
- return userFunction;
- }
-
- public StreamInvokable getUserFunction(Configuration taskConfiguration,
- List> outputs, String instanceID,
- FaultTolerancyBuffer recordBuffer) {
-
- // Default value is a TaskInvokable even if it was called from a source
- Class extends StreamInvokable> userFunctionClass = taskConfiguration
- .getClass("userfunction", DefaultTaskInvokable.class,
- StreamInvokable.class);
- StreamInvokable userFunction = null;
-
- try {
- userFunction = userFunctionClass.newInstance();
- userFunction.declareOutputs(outputs, instanceID, recordBuffer);
- } catch (Exception e) {
-
- }
- return userFunction;
- }
-
- private void setPartitioner(Configuration taskConfiguration, int nrOutput,
- List> partitioners) {
- Class extends ChannelSelector> partitioner = taskConfiguration
- .getClass("partitionerClass_" + nrOutput,
- DefaultPartitioner.class, ChannelSelector.class);
-
- try {
- if (partitioner.equals(FieldsPartitioner.class)) {
- int keyPosition = taskConfiguration.getInteger(
- "partitionerIntParam_" + nrOutput, 1);
- Class extends Key> keyClass = taskConfiguration.getClass(
- "partitionerClassParam_" + nrOutput, StringValue.class,
- Key.class);
-
- partitioners.add(partitioner.getConstructor(int.class,
- Class.class).newInstance(keyPosition, keyClass));
-
- } else {
- partitioners.add(partitioner.newInstance());
- }
- } catch (Exception e) {
- System.out.println("partitioner error" + " " + "partitioner_"
- + nrOutput);
- System.out.println(e);
- }
- }
-
-}
\ No newline at end of file