diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index 0f6f42f17eec9c18e5312ae315edfccd7acfcb4f..d8c29382c86eeaa869fe612ae4559f3ccd0c5e3e 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -144,7 +144,6 @@
-
org.apache.maven.plugins
maven-jar-plugin
@@ -171,7 +170,44 @@
once
-Xmx1024m
-
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.10
+
+
+ verify
+
+ check
+
+
+
+
+ false
+ 0
+
+
+ **/.*
+ **/*.prefs
+
+ **/resources/**
+
+ **/stratosphere-bin/conf/slaves
+
+ README.md
+ CHANGELOG
+ **/*.creole
+ CONTRIBUTORS
+
+ **/pom.xml
+ **/*.iml
+
+ **/target/**
+
+
+
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java
index eb8c252e3389ebf23bd049d7b7ef05d0a8f29a25..2cddae5bc377daa097448db5b0945802b9b8ac28 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java
@@ -24,6 +24,8 @@ import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
/**
* TaskEvent for sending record acknowledgements to the input's fault tolerance
* buffer
+ *
+ *
*/
public class AckEvent extends AbstractTaskEvent {
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
index 226091248a017a4bdb8810adf4d2bf7199bab88f..c49d6b699541ba1f56d1503147e14ea48f00d6bb 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEventListener.java
@@ -49,13 +49,15 @@ public class AckEventListener implements EventListener {
/**
* When an AckEvent occurs checks if it was directed at this task, if so,
* acknowledges the record given in the AckEvent
+ *
*/
public void eventOccurred(AbstractTaskEvent event) {
+
AckEvent ackEvent = (AckEvent) event;
String recordId = ackEvent.getRecordId();
- String ackChannelId = recordId.split("-", 2)[0];
+ String ackCID = recordId.split("-", 2)[0];
- if (ackChannelId.equals(taskInstanceID)) {
+ if (ackCID.equals(taskInstanceID)) {
Long nt = System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId());
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
index d454203a352d529480da8fe30d6a93aafadc6ad2..cf43e3b7cb73db5e654c01125316f3a188e844d1 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
@@ -39,19 +39,18 @@ public class FaultToleranceBuffer {
private static final Log log = LogFactory
.getLog(FaultToleranceBuffer.class);
- private long timeout = 10000;
+ private long TIMEOUT = 10000;
private Long timeOfLastUpdate;
private Map recordBuffer;
private Map ackCounter;
private Map ackMap;
private SortedMap> recordsByTime;
private Map recordTimestamps;
- private int numberofOutputs;
private List> outputs;
private final String channelID;
- private int numberOfChannels;
+ private int numberOfOutputs;
private int[] numberOfOutputChannels;
/**
@@ -65,6 +64,7 @@ public class FaultToleranceBuffer {
* @param numberOfChannels
* Number of output channels for the output components
*/
+
public FaultToleranceBuffer(List> outputs,
String channelID, int[] numberOfChannels) {
this.timeOfLastUpdate = System.currentTimeMillis();
@@ -72,14 +72,14 @@ public class FaultToleranceBuffer {
this.recordBuffer = new HashMap();
this.ackCounter = new HashMap();
this.ackMap = new HashMap();
- this.numberOfOutputChannels = numberOfChannels;
-
+ this.numberOfOutputChannels=numberOfChannels;
+
int totalChannels = 0;
for (int i : numberOfChannels)
totalChannels += i;
- this.numberofOutputs = numberOfOutputChannels.length;
- this.numberOfChannels = totalChannels;
+
+ this.numberOfOutputs = totalChannels;
this.channelID = channelID;
this.recordsByTime = new TreeMap>();
this.recordTimestamps = new HashMap();
@@ -88,21 +88,15 @@ public class FaultToleranceBuffer {
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
+ *
*/
public void addRecord(StreamRecord streamRecord) {
String id = streamRecord.getId();
recordBuffer.put(id, streamRecord.copy());
- ackCounter.put(id, numberOfChannels);
-
- //TODO: remove comments for exactly once processing
-// int[] ackCounts = new int[numberOfChannels + 1];
-//
-// for (int i = 0; i < numberOfOutputChannels.length; i++) {
-// ackCounts[i + 1] = numberOfOutputChannels[i];
-// }
-//
-// ackMap.put(id, ackCounts);
-
+ ackCounter.put(id, numberOfOutputs);
+
+ ackMap.put(id,numberOfOutputChannels.clone());
+
addTimestamp(id);
log.trace("Record added to buffer: " + id);
}
@@ -117,11 +111,11 @@ public class FaultToleranceBuffer {
* @return Returns the list of the records that have timed out.
*/
List timeoutRecords(Long currentTime) {
- if (timeOfLastUpdate + timeout < currentTime) {
+ if (timeOfLastUpdate + TIMEOUT < currentTime) {
log.trace("Updating record buffer");
List timedOutRecords = new LinkedList();
Map> timedOut = recordsByTime.subMap(0L,
- currentTime - timeout);
+ currentTime - TIMEOUT);
for (Set recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
@@ -174,11 +168,15 @@ public class FaultToleranceBuffer {
*
* @param recordID
* The ID of the record that will be removed
+ *
*/
public StreamRecord removeRecord(String recordID) {
ackCounter.remove(recordID);
+
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
+
log.trace("Record removed from buffer: " + recordID);
+
return recordBuffer.remove(recordID);
}
@@ -192,7 +190,7 @@ public class FaultToleranceBuffer {
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
- Integer ackCount = ackCounter.get(recordID) - 1;
+ Integer ackCount = ackCounter.get(recordID)-1;
if (ackCount == 0) {
removeRecord(recordID);
} else {
@@ -208,25 +206,35 @@ public class FaultToleranceBuffer {
* @param recordID
* ID of the record that has been acknowledged
*
- * @param output
+ * @param outputChannel
* Number of the output channel that sent the ack
*/
- public void ackRecord(String recordID, int output) {
+ public void ackRecord(String recordID, int outputChannel) {
+
if (ackMap.containsKey(recordID)) {
- if (decreaseAckCounter(recordID, output)) {
+ int[] acks = ackMap.get(recordID);
+ acks[outputChannel]--;
+
+ if (allZero(acks)) {
removeRecord(recordID);
}
+
}
}
- private boolean decreaseAckCounter(String recordID, int output) {
- int[] acks = ackMap.get(recordID);
- acks[output + 1]--;
- if (acks[output + 1] == 0) {
- acks[0]++;
+ /**
+ * Checks whether an int array contains only zeros.
+ * @param values
+ * The array to check
+ * @return
+ * true only if the array contains only zeros
+ */
+ private static boolean allZero(int[] values) {
+ for (int value : values) {
+ if (value!=0)
+ return false;
}
-
- return (acks[0] == numberofOutputs);
+ return true;
}
/**
@@ -294,12 +302,12 @@ public class FaultToleranceBuffer {
}
- public long getTimeout() {
- return this.timeout;
+ public long getTIMEOUT() {
+ return this.TIMEOUT;
}
- public void setTimeout(long timeout) {
- this.timeout = timeout;
+ public void setTIMEOUT(long TIMEOUT) {
+ this.TIMEOUT = TIMEOUT;
}
public Map getRecordBuffer() {
@@ -331,11 +339,11 @@ public class FaultToleranceBuffer {
}
public int getNumberOfOutputs() {
- return this.numberOfChannels;
+ return this.numberOfOutputs;
}
void setNumberOfOutputs(int numberOfOutputs) {
- this.numberOfChannels = numberOfOutputs;
+ this.numberOfOutputs = numberOfOutputs;
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
index 255d55a44f210ed5d687b6275b68b652ea6899df..28b669068df572fbee235aa981887b3d0f3b7830 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
@@ -48,6 +48,7 @@ import eu.stratosphere.types.Key;
/**
* Object for building Stratosphere stream processing job graphs
+ *
*/
public class JobGraphBuilder {
@@ -57,6 +58,7 @@ public class JobGraphBuilder {
private Map numberOfInstances;
private Map> numberOfOutputChannels;
+
/**
* Creates a new JobGraph with the given name
*
@@ -67,8 +69,9 @@ public class JobGraphBuilder {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap();
numberOfInstances = new HashMap();
- numberOfOutputChannels = new HashMap>();
+ numberOfOutputChannels=new HashMap>();
log.debug("JobGraph created");
+
}
/**
@@ -79,10 +82,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the source
*/
- public void setSource(String sourceName, final Class extends UserSourceInvokable> InvokableClass) {
+ public void setSource(String sourceName,
+ final Class extends UserSourceInvokable> InvokableClass) {
+
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
- Configuration config = new TaskConfig(source.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(source.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sourceName);
components.put(sourceName, source);
@@ -90,7 +96,6 @@ public class JobGraphBuilder {
log.debug("SOURCE: " + sourceName);
}
- // TODO: eliminate code repetition
/**
* Adds a source component to the JobGraph
*
@@ -101,11 +106,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
- public void setSource(String sourceName, final Class extends UserSourceInvokable> InvokableClass, int parallelism) {
+ public void setSource(String sourceName,
+ final Class extends UserSourceInvokable> InvokableClass,
+ int parallelism) {
+
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
source.setNumberOfSubtasks(parallelism);
- Configuration config = new TaskConfig(source.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(source.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sourceName);
components.put(sourceName, source);
@@ -121,10 +130,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the task
*/
- public void setTask(String taskName, final Class extends UserTaskInvokable> InvokableClass) {
+ public void setTask(String taskName,
+ final Class extends UserTaskInvokable> InvokableClass) {
+
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
- Configuration config = new TaskConfig(task.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(task.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", taskName);
components.put(taskName, task);
@@ -142,11 +154,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
- public void setTask(String taskName, final Class extends UserTaskInvokable> InvokableClass, int parallelism) {
+ public void setTask(String taskName,
+ final Class extends UserTaskInvokable> InvokableClass,
+ int parallelism) {
+
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setNumberOfSubtasks(parallelism);
- Configuration config = new TaskConfig(task.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(task.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", taskName);
components.put(taskName, task);
@@ -162,10 +178,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the sink
*/
- public void setSink(String sinkName, final Class extends UserSinkInvokable> InvokableClass) {
+ public void setSink(String sinkName,
+ final Class extends UserSinkInvokable> InvokableClass) {
+
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
- Configuration config = new TaskConfig(sink.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(sink.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sinkName);
components.put(sinkName, sink);
@@ -183,12 +202,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
- public void setSink(String sinkName, final Class extends UserSinkInvokable> InvokableClass, int parallelism) {
+ public void setSink(String sinkName,
+ final Class extends UserSinkInvokable> InvokableClass,
+ int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
sink.setNumberOfSubtasks(parallelism);
- Configuration config = new TaskConfig(sink.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(sink.getConfiguration())
+ .getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sinkName);
components.put(sinkName, sink);
@@ -210,22 +232,33 @@ public class JobGraphBuilder {
* @param channelType
* Channel Type
*/
- private void connect(String upStreamComponentName, String downStreamComponentName,
- Class extends ChannelSelector> PartitionerClass, ChannelType channelType) {
+ private void connect(String upStreamComponentName,
+ String downStreamComponentName,
+ Class extends ChannelSelector> PartitionerClass,
+ ChannelType channelType) {
- AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
- AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
+ AbstractJobVertex upStreamComponent = components
+ .get(upStreamComponentName);
+ AbstractJobVertex downStreamComponent = components
+ .get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
- Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
- config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
+ Configuration config = new TaskConfig(
+ upStreamComponent.getConfiguration()).getConfiguration();
+ config.setClass(
+ "partitionerClass_"
+ + (upStreamComponent.getNumberOfForwardConnections()-1),
PartitionerClass);
- log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - " + upStreamComponentName + " -> "
- + downStreamComponentName);
+ log.debug("CONNECTED: "
+ + PartitionerClass.getSimpleName() + " - "
+ + upStreamComponentName + " -> " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
- log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
- + upStreamComponentName + " -> " + downStreamComponentName, e);
+ log.error(
+ "Cannot connect components with "
+ + PartitionerClass.getSimpleName() + " : "
+ + upStreamComponentName + " -> "
+ + downStreamComponentName, e);
}
}
@@ -241,9 +274,16 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
- public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
- connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class, ChannelType.INMEMORY);
- addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
+ public void broadcastConnect(String upStreamComponentName,
+ String downStreamComponentName) {
+
+ connect(upStreamComponentName, downStreamComponentName,
+ BroadcastPartitioner.class, ChannelType.INMEMORY);
+
+ addOutputChannels(upStreamComponentName,numberOfInstances.get(downStreamComponentName));
+
+ // log.debug("Components connected with broadcast: " +
+ // upStreamComponentName + " to " + downStreamComponentName);
}
/**
@@ -263,32 +303,44 @@ public class JobGraphBuilder {
* @param keyClass
* Class of the key Value stored in the record
*/
- public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, int keyPosition,
+ public void fieldsConnect(String upStreamComponentName,
+ String downStreamComponentName, int keyPosition,
Class extends Key> keyClass) {
- AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
- AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
+ AbstractJobVertex upStreamComponent = components
+ .get(upStreamComponentName);
+ AbstractJobVertex downStreamComponent = components
+ .get(downStreamComponentName);
try {
- upStreamComponent.connectTo(downStreamComponent, ChannelType.INMEMORY);
+ upStreamComponent.connectTo(downStreamComponent,
+ ChannelType.INMEMORY);
- Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
+ Configuration config = new TaskConfig(
+ upStreamComponent.getConfiguration()).getConfiguration();
- config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
+ config.setClass(
+ "partitionerClass_"
+ + (upStreamComponent.getNumberOfForwardConnections()-1),
FieldsPartitioner.class);
- config.setClass("partitionerClassParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
+ config.setClass(
+ "partitionerClassParam_"
+ + (upStreamComponent.getNumberOfForwardConnections()-1),
keyClass);
- config.setInteger("partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
+ config.setInteger(
+ "partitionerIntParam_"
+ + (upStreamComponent.getNumberOfForwardConnections()-1),
keyPosition);
- addOutputChannels(upStreamComponentName, 1);
- log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> " + downStreamComponentName
- + ", KEY: " + keyPosition);
+ addOutputChannels(upStreamComponentName,1);
+ log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName
+ + " -> " + downStreamComponentName + ", KEY: "
+ + keyPosition);
} catch (JobGraphDefinitionException e) {
- log.error(
- "Cannot connect components by field: " + upStreamComponentName + " to " + downStreamComponentName,
+ log.error("Cannot connect components by field: "
+ + upStreamComponentName + " to " + downStreamComponentName,
e);
}
}
@@ -305,9 +357,14 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
- public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
- connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class, ChannelType.INMEMORY);
- addOutputChannels(upStreamComponentName, 1);
+ public void globalConnect(String upStreamComponentName,
+ String downStreamComponentName) {
+
+ connect(upStreamComponentName, downStreamComponentName,
+ GlobalPartitioner.class, ChannelType.INMEMORY);
+
+ addOutputChannels(upStreamComponentName,1);
+
}
/**
@@ -322,9 +379,13 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
- public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
- connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class, ChannelType.INMEMORY);
- addOutputChannels(upStreamComponentName, 1);
+ public void shuffleConnect(String upStreamComponentName,
+ String downStreamComponentName) {
+
+ connect(upStreamComponentName, downStreamComponentName,
+ ShufflePartitioner.class, ChannelType.INMEMORY);
+
+ addOutputChannels(upStreamComponentName,1);
}
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
@@ -338,25 +399,31 @@ public class JobGraphBuilder {
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
- component.getConfiguration().setInteger("numberOfInputs", component.getNumberOfBackwardConnections());
+ component.getConfiguration().setInteger("numberOfInputs",
+ component.getNumberOfBackwardConnections());
}
}
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
- component.getConfiguration().setInteger("numberOfOutputs", component.getNumberOfForwardConnections());
+ component.getConfiguration().setInteger("numberOfOutputs",
+ component.getNumberOfForwardConnections());
}
-
+
for (String component : numberOfOutputChannels.keySet()) {
Configuration config = components.get(component).getConfiguration();
- List channelNumList = numberOfOutputChannels.get(component);
- for (int i = 0; i < channelNumList.size(); i++) {
- config.setInteger("channels_" + i, channelNumList.get(i));
+
+ List channelNumList=numberOfOutputChannels.get(component);
+
+ for(int i=0;i {
return userFunction;
}
- // TODO find a better solution for this
+ // TODO: use TCP-like waiting
public void threadSafePublish(AbstractTaskEvent event,
RecordReader input) throws InterruptedException,
IOException {
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java
index 2c10dc1054dbe2258d3df78e02fdd28e343de2ce..a08f98a36fdf3009f5fccad476401a13e9d8a82d 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java
@@ -44,6 +44,7 @@ public abstract class StreamInvokableComponent {
}
public final void emit(StreamRecord record) {
+
record.setId(channelID);
emittedRecords.addRecord(record);
try {
@@ -57,13 +58,19 @@ public abstract class StreamInvokableComponent {
}
}
- //TODO: Add fault tolerance
+ //TODO:Add fault tolreance
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
try {
+
outputs.get(outputChannel).emit(record);
+
} catch (Exception e) {
+
log.warn("EMIT ERROR: " + e.getMessage() + " -- " + name);
+
}
+
}
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
index 60cd321472e3cda29d99ecc41197688abe0b1e52..a6a07c19f86689e09be98a48852fcbf84433eab4 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
@@ -58,7 +58,6 @@ public class StreamSink extends AbstractOutputTask {
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
- //TODO: eliminate code repetition (StreamTask)
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
index 26973b848e407ca064a2e8070eab11e76d277379..11ae7c9f17a201e7ad1648aba3a73b10c8d0bc62 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
@@ -84,7 +84,6 @@ public class StreamTask extends AbstractTask {
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
}
- //TODO: eliminate code repetition (StreamSink)
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
@@ -97,6 +96,7 @@ public class StreamTask extends AbstractTask {
hasInput = true;
StreamRecord streamRecord = input.next();
String id = streamRecord.getId();
+ // TODO create method for concurrent publishing
try {
userFunction.invoke(streamRecord);
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
@@ -108,7 +108,7 @@ public class StreamTask extends AbstractTask {
}
}
}
- log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
+ log.debug("TASK " + name + "invoke finished with instance id " + taskInstanceID);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchFieldException.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchFieldException.java
index 7fad7f1e2aa7003d50dc101a668657def5f1f385..211dafaa8beb34565823119cfb33cb807d2a9f0d 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchFieldException.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchFieldException.java
@@ -16,6 +16,7 @@
package eu.stratosphere.streaming.api.streamrecord;
public class NoSuchFieldException extends StreamRecordException {
-
+
private static final long serialVersionUID = 3604681465275112784L;
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
index 073fb91b905e29843ba65fc4f310e4d02baec41c..d863e27b4524b5cb845e4ce2513af2020a92d522 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
@@ -105,7 +105,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return numOfRecords;
}
- // TODO: use UUID
/**
* Set the ID of the StreamRecord object
*
@@ -346,19 +345,18 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// TODO: fix this method to work properly for non StringValue types
public String toString() {
- StringBuilder outputString = new StringBuilder("(");
+ StringBuilder outputString = new StringBuilder();
StringValue output;
for (int k = 0; k < numOfRecords; ++k) {
for (int i = 0; i < numOfFields; i++) {
try {
output = (StringValue) recordBatch.get(k)[i];
- outputString.append(output.getValue() + ",");
+ outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
- outputString.append("NON-STRING,");
+ outputString.append("NON-STRING*");
}
}
}
- outputString.append(")");
return outputString.toString();
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/BroadcastPartitioner.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/BroadcastPartitioner.java
index b539cd0224ec0e146fa797e61d4e79226605535b..02d7d0d7a795d38d64cf556c54f8d255b015ae01 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/BroadcastPartitioner.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/BroadcastPartitioner.java
@@ -22,10 +22,12 @@ public class BroadcastPartitioner implements ChannelSelector {
@Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
+
int[] returnChannels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnChannels[i] = i;
}
return returnChannels;
}
+
}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/DefaultPartitioner.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/DefaultPartitioner.java
index 7b0843ea7e0161851f0da19a9dcb0b04676a555b..cffa4725f6df8f4a3ec393e84cf9d2d5c99b71c9 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/DefaultPartitioner.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/DefaultPartitioner.java
@@ -22,7 +22,10 @@ public class DefaultPartitioner implements ChannelSelector {
@Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
+
return new ShufflePartitioner().selectChannels(record,
numberOfOutputChannels);
+
}
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java
index 0a9b3ca7d580354ac6d8b16b278ea2a649cb6c84..cac23a673b3ac05bf757c9d8ffa75ad246bb5a39 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java
@@ -47,12 +47,19 @@ public class CellInfo {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSourceInvokable.class);
+ // graphBuilder.setSource("infoSource2", InfoSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
+ // graphBuilder.setSource("querySource2", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 3);
graphBuilder.setSink("sink", CellSinkInvokable.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class);
- graphBuilder.fieldsConnect("querySource", "cellTask", 0, IntValue.class);
+ graphBuilder
+ .fieldsConnect("querySource", "cellTask", 0, IntValue.class);
+ // graphBuilder.fieldsConnect("infoSource2", "cellTask", 0,
+ // IntValue.class);
+ // graphBuilder.fieldsConnect("querySource2", "cellTask",0,
+ // IntValue.class);
graphBuilder.shuffleConnect("cellTask", "sink");
return graphBuilder.getJobGraph();
@@ -63,7 +70,8 @@ public class CellInfo {
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
- File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
+ File file = new File(
+ "target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph();
@@ -76,7 +84,8 @@ public class CellInfo {
// 6498),
// configuration);
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
+ Client client = new Client(new InetSocketAddress(
+ "hadoop02.ilab.sztaki.hu", 6123), configuration);
exec.start();
client.run(jG, true);
exec.stop();
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellSinkInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellSinkInvokable.java
index 37576729437cd28c4313c75a5207c9ea38880c80..d2386dd9c8eb2d1c17bfc77bc1f7fcc7a265a215 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellSinkInvokable.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellSinkInvokable.java
@@ -20,16 +20,16 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CellSinkInvokable extends UserSinkInvokable {
- int counter = 0;
+ int c=0;
+ @Override
+ public void invoke(StreamRecord record) throws Exception {
- @Override
- public void invoke(StreamRecord record) throws Exception {
- counter++;
- }
-
- @Override
- public String getResult() {
- return String.valueOf(counter);
- }
+ c++;
+ }
+
+ @Override
+ public String getResult(){
+ return String.valueOf(c);
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellTaskInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellTaskInvokable.java
index 8e57421da5c440d74c1c626c5d7b55edacc67b9b..353b923a66d8c39b64bbf4bb1dcb69cfe89aebb2 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellTaskInvokable.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellTaskInvokable.java
@@ -40,8 +40,6 @@ public class CellTaskInvokable extends UserTaskInvokable {
timeStamp = (LongValue) record.getField(1);
numOfFields=record.getNumOfFields();
-
- //TODO: consider adding source to StreamRecord as a workaround
// INFO
if (numOfFields == 2) {
engine.put(cellID.getValue(), timeStamp.getValue());
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/IWorkerEngine.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/IWorkerEngine.java
index cae7f305dcdb669a320da2275f6d38aa5dde9a08..8c0e058f2b5d9dd0a490f643dc2bbad4d32fa145 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/IWorkerEngine.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/IWorkerEngine.java
@@ -17,5 +17,6 @@ package eu.stratosphere.streaming.test.cellinfo;
public interface IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId);
+
public void put(int cellId, long timeStamp);
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/InfoSourceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/InfoSourceInvokable.java
index 28bff801d2d60c59a5cc3bd2cdfb272f272d29ba..c7ed48c184b78eea66ea9e3e7ba43490f004661c 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/InfoSourceInvokable.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/InfoSourceInvokable.java
@@ -23,9 +23,9 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
public class InfoSourceInvokable extends UserSourceInvokable {
-
- Random rand = new Random();
- int cellNumber = 10;
+
+ Random _rand= new Random();
+ int _cellNumber=10;
private IntValue cellId = new IntValue(5);
private LongValue timeStamp = new LongValue(500);
@@ -34,11 +34,11 @@ public class InfoSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 500000; i++) {
- cellId.setValue(rand.nextInt(cellNumber));
+ cellId.setValue(_rand.nextInt(_cellNumber));
timeStamp.setValue(System.currentTimeMillis());
-
- record.setRecord(cellId, timeStamp);
-
+
+ record.setRecord(cellId,timeStamp);
+
emit(record);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/QuerySourceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/QuerySourceInvokable.java
index dd3a4c8a4e50e093a7a8bd6dbb1df063cfcd44f3..4065329569d23a5b57a93c47e84961a58cf3b338 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/QuerySourceInvokable.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/QuerySourceInvokable.java
@@ -24,13 +24,13 @@ import eu.stratosphere.types.LongValue;
public class QuerySourceInvokable extends UserSourceInvokable {
- Random _rand = new Random();
- int _cellNumber = 10;
+ Random _rand= new Random();
+ int _cellNumber=10;
private IntValue cellId = new IntValue(5);
private LongValue timeStamp = new LongValue(500);
private IntValue lastMillis = new IntValue(100);
- private StreamRecord record = new StreamRecord(cellId, timeStamp, lastMillis);
+ private StreamRecord record = new StreamRecord(cellId, timeStamp,lastMillis);
@Override
public void invoke() throws Exception {
@@ -38,11 +38,12 @@ public class QuerySourceInvokable extends UserSourceInvokable {
Thread.sleep(1);
cellId.setValue(_rand.nextInt(_cellNumber));
timeStamp.setValue(System.currentTimeMillis());
-
- record.setRecord(cellId, timeStamp, lastMillis);
-
+
+ record.setRecord(cellId,timeStamp,lastMillis);
+
emit(record);
}
}
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/Util.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/Util.java
index c7487d6bbd5d15bc23105d344d1a4bf3916b3196..c4fcd24aafe7816775804a3cbb7faadd5c333a20 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/Util.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/Util.java
@@ -16,11 +16,11 @@
package eu.stratosphere.streaming.test.cellinfo;
public class Util {
- public static int mod(int x, int y) {
- int result = x % y;
- if (result < 0) {
- result += y;
- }
- return result;
- }
+ public static int mod(int x, int y) {
+ int result = x % y;
+ if (result < 0) {
+ result += y;
+ }
+ return result;
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineBin.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineBin.java
index e964cdf9070f269f24b1c8b8af9b7bd0e80d13fe..39943f013cfd9afa09ee797468d1c98e0401ffc7 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineBin.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineBin.java
@@ -25,7 +25,8 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
private int[][] counters_;
private int pointer_;
- public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
+ public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval,
+ long currentTime) {
lastTimeUpdated_ = currentTime / unitLength * unitLength;
unitLength_ = unitLength;
counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
@@ -63,11 +64,15 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
int shift = refresh(timeStamp);
int numOfLastIntervals = (int) (lastMillis / unitLength_);
if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
+ // System.out.println(counters_.length);
+ // System.out.println(shift);
+ // System.out.println(numOfLastIntervals);
return -1;
}
int sum = 0;
for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
sum += getCell(i, cellId);
+ // System.out.println(i + " " + getCell(i, cellId));
}
return sum;
}
@@ -78,10 +83,12 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
int retVal;
if (shiftBy > 0) {
lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
+ // System.out.println(lastTimeUpdated_);
retVal = 0;
} else {
retVal = -shiftBy;
}
+ // System.out.println("Shiftby " + shiftBy + " at " + timeStamp);
return retVal;
}
@@ -90,5 +97,7 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
if (shift >= counters_.length)
return;
incrCell(shift, cellId);
+ // System.out.println("Pointer:" + pointer_);
+ // System.out.println(getCell(shift, cellId));
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineExact.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineExact.java
index 1e8fcd31f1e5310f02c25221b529aa0b696a6eb8..1f3ac65df864eb4a6ff08e084755260fa2961d6b 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineExact.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/WorkerEngineExact.java
@@ -26,7 +26,8 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
private TreeMap[] counters_;
@SuppressWarnings("unchecked")
- public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
+ public WorkerEngineExact(int numOfCells, int bufferInterval,
+ long currentTime) {
lastTimeUpdated_ = currentTime;
bufferInterval_ = bufferInterval;
counters_ = new TreeMap[numOfCells];
@@ -37,7 +38,8 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId) {
refresh(timeStamp);
- Map subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
+ Map subMap = counters_[cellId].subMap(timeStamp
+ - lastMillis, true, timeStamp, false);
int retVal = 0;
for (Map.Entry entry : subMap.entrySet()) {
retVal += entry.getValue();
@@ -48,6 +50,7 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public void put(int cellId, long timeStamp) {
refresh(timeStamp);
TreeMap map = counters_[cellId];
+ // System.out.println(map.size());
if (map.containsKey(timeStamp)) {
map.put(timeStamp, map.get(timeStamp) + 1);
} else {
@@ -57,11 +60,16 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public void refresh(long timeStamp) {
if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
+ // System.out.println("Refresh at " + timeStamp);
for (int i = 0; i < counters_.length; ++i) {
- for (Iterator> it = counters_[i].entrySet().iterator(); it.hasNext();) {
+ for (Iterator> it = counters_[i]
+ .entrySet().iterator(); it.hasNext();) {
Map.Entry entry = it.next();
long time = entry.getKey();
if (timeStamp - time > bufferInterval_) {
+ // System.out.println("Remove: " + i + "_" + time +
+ // " at " +
+ // timeStamp);
it.remove();
} else {
break;
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java
index 7b9742b1941189cd1fb2fde268f906fcad329a0e..377873548c4d7f00ce18d7461512f8334bd07a04 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java
@@ -29,7 +29,6 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private String word = new String();
- private StreamRecord streamRecord = new StreamRecord();
private int count = 1;
private int i = 0;
private long time;
@@ -55,8 +54,6 @@ public class WordCountCounter extends UserTaskInvokable {
countValue.setValue(1);
}
// TODO: object reuse
-// streamRecord.addRecord(wordValue, countValue);
-// emit(streamRecord.copy());
emit(new StreamRecord(wordValue, countValue));
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource.java
index 6c84a0cf599a67cd6d79176d8b288ccdf10e4cfb..79b7f86d8237bee88274295ac3987f25016f46f4 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource.java
@@ -25,10 +25,12 @@ public class WordCountDummySource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(lineValue);
public WordCountDummySource() {
+
}
@Override
public void invoke() throws Exception {
+
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
@@ -39,4 +41,4 @@ public class WordCountDummySource extends UserSourceInvokable {
emit(record);
}
}
-}
\ No newline at end of file
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource2.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource2.java
index 631af1f9e7ce19bf8e744d1ed7650650aa3f19df..236d5391b56b3adf1e5e6198fa2533d9dcb42b2e 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource2.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountDummySource2.java
@@ -25,8 +25,9 @@ public class WordCountDummySource2 extends UserSourceInvokable {
StreamRecord record = new StreamRecord(lineValue);
private long time;
private long prevTime = System.currentTimeMillis();
-
+
public WordCountDummySource2() {
+
}
@Override
@@ -34,11 +35,11 @@ public class WordCountDummySource2 extends UserSourceInvokable {
for (int i = 0; i < 1000000; i++) {
if (i % 50000 == 0) {
- time = System.currentTimeMillis();
- System.out.println("Source:\t\t" + i + "\t\tTime: " + (time - prevTime));
- prevTime = time;
+ time= System.currentTimeMillis();
+ System.out.println("Source:\t" + i + "\t----Time: "+(time-prevTime));
+ prevTime=time;
}
-
+
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
} else {
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java
index 88614ba351f64022c3364d0c3c74d569e3180d41..bfc718057f762dba3d4e95b4446872276b55131c 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java
@@ -37,17 +37,18 @@ public class WordCountLocal {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
- graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
+ graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
+ StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
-
- //TODO: arguments check
+
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
+
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
@@ -61,14 +62,18 @@ public class WordCountLocal {
exec.start();
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
+ Client client = new Client(new InetSocketAddress("localhost",
+ 6498), configuration);
+
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
+ Client client = new Client(new InetSocketAddress(
+ "hadoop02.ilab.sztaki.hu", 6123), configuration);
+
client.run(jG, true);
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java
index ffeef076c123198338a8faf810b04c32b74dacbf..db1b39564f34f482a860587d76572ed878dc073d 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java
@@ -39,7 +39,8 @@ public class WordCountRemote {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
- graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
+ graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
+ StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
@@ -49,7 +50,9 @@ public class WordCountRemote {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
- File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
+
+ File file = new File(
+ "target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph();
@@ -57,8 +60,12 @@ public class WordCountRemote {
jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration();
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
+
+ Client client = new Client(new InetSocketAddress(
+ "hadoop02.ilab.sztaki.hu", 6123), configuration);
+
client.run(jG, true);
+
} catch (Exception e) {
System.out.println(e);
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java
index 58fd4bcc22f684c50eafd3af74a67448d028f72f..2bf6affe42f70bd2c962866254c144475a9e06bd 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java
@@ -20,23 +20,22 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
- int nrOfRecords = 0;
+ int nrOfRecords=0;
private long time;
private long prevTime = System.currentTimeMillis();
-
@Override
public void invoke(StreamRecord record) throws Exception {
nrOfRecords++;
if (nrOfRecords % 50000 == 0) {
- time = System.currentTimeMillis();
- System.out.println("Sink:\t" + nrOfRecords + "\t----Time: " + (time - prevTime));
- prevTime = time;
+ time= System.currentTimeMillis();
+ System.out.println("Sink:\t" + nrOfRecords + "\t----Time: "+(time-prevTime));
+ prevTime=time;
}
}
-
+
@Override
- public String getResult() {
+ public String getResult(){
return String.valueOf(nrOfRecords);
}
-
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java
index f3dcbac182ab14b325ee5ce89de79f3f84322a0e..23bf4b30f5dd3ba07d0789c8c37d4e3a7429a1f5 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java
@@ -33,7 +33,8 @@ public class WordCountSource extends UserSourceInvokable {
public WordCountSource() {
try {
- br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
+ br = new BufferedReader(new FileReader(
+ "src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java
index c273cef92d4c0d9eba39be041451f38fe71cd308..86e995dfaa28714666ee92c3a10454465667905f 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java
@@ -34,8 +34,9 @@ public class WordCountSplitter extends UserTaskInvokable {
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
- System.out.println("Splitter:\t" + i + "\t----Time: " + (time - prevTime));
- prevTime = time;
+ System.out.println("Splitter:\t" + i + "\t----Time: "
+ + (time - prevTime));
+ prevTime=time;
}
sentence = (StringValue) record.getRecord(0)[0];
words = sentence.getValue().split(" ");
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java
index cef57e4217f63749ee81e3dc53898851ac0bb76e..cfe5c34124868f8456ea697413cc0c06885b9ad0 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java
@@ -230,7 +230,7 @@ public class FaultToleranceBufferTest {
// TODO: create more tests for this method
@Test
public void testTimeOutRecords() {
- faultTolerancyBuffer.setTimeout(1000);
+ faultTolerancyBuffer.setTIMEOUT(1000);
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new StringValue("V1"));
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java
index 94e4a6b61850c4e0bc8d242dd5150d4d320798d1..e5459bc5ede25f0f66458d7db05a41fa23587a36 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java
@@ -12,7 +12,6 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
-
package eu.stratosphere.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals;
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
index 4cfdca72c56fd569249d0ad6357a466a92407da3..1f7c83f5556b1a1f7a0bc33c0cde9d8593d7c200 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
@@ -32,17 +32,20 @@ public class StreamRecordTest {
@Test
public void singleRecordSetGetTest() {
- StreamRecord record = new StreamRecord(new StringValue("Stratosphere"), new IntValue(1));
+ StreamRecord record = new StreamRecord(new StringValue("Stratosphere"),
+ new IntValue(1));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
- assertEquals("Stratosphere", ((StringValue) record.getField(0)).getValue());
+ assertEquals("Stratosphere",
+ ((StringValue) record.getField(0)).getValue());
assertEquals(1, ((IntValue) record.getField(1)).getValue());
record.setField(1, new StringValue("Big Data"));
assertEquals("Big Data", ((StringValue) record.getField(1)).getValue());
- record.setRecord(new IntValue(2), new StringValue("Big Data looks tiny from here."));
+ record.setRecord(new IntValue(2), new StringValue(
+ "Big Data looks tiny from here."));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(2, ((IntValue) record.getField(0)).getValue());
@@ -79,9 +82,11 @@ public class StreamRecordTest {
public void copyTest() {
StreamRecord a = new StreamRecord(new StringValue("Big"));
StreamRecord b = a.copy();
- assertTrue(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
+ assertTrue(((StringValue) a.getField(0)).getValue().equals(
+ ((StringValue) b.getField(0)).getValue()));
b.setRecord(new StringValue("Data"));
- assertFalse(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
+ assertFalse(((StringValue) a.getField(0)).getValue().equals(
+ ((StringValue) b.getField(0)).getValue()));
}
@Test
@@ -94,7 +99,8 @@ public class StreamRecordTest {
}
try {
- a.setRecord(new StringValue("Data"), new StringValue("Stratosphere"));
+ a.setRecord(new StringValue("Data"),
+ new StringValue("Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/BroadcastPartitionerTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/BroadcastPartitionerTest.java
index 6e31f1706afbcb1abc962b5eb2692800ba021ede..75bf3ac683cc82d296e3aabea961ceb0d9fdfef0 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/BroadcastPartitionerTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/BroadcastPartitionerTest.java
@@ -1,18 +1,3 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
@@ -38,7 +23,7 @@ public class BroadcastPartitionerTest {
int[] first = new int[] { 0 };
int[] second = new int[] { 0, 1 };
int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-
+
assertArrayEquals(first, broadcastPartitioner.selectChannels(streamRecord, 1));
assertArrayEquals(second, broadcastPartitioner.selectChannels(streamRecord, 2));
assertArrayEquals(sixth, broadcastPartitioner.selectChannels(streamRecord, 6));
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/DefaultPartitionerTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/DefaultPartitionerTest.java
index 79ba6e0060dbbcca9ba87f1ea338064f874e1bc2..6f90391269df80777a65b039b186704183006dae 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/DefaultPartitionerTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/DefaultPartitionerTest.java
@@ -1,18 +1,3 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertEquals;
@@ -37,9 +22,12 @@ public class DefaultPartitionerTest {
@Test
public void testSelectChannelsLength() {
- assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 1).length);
- assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 2).length);
- assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 1024).length);
+ assertEquals(1,
+ defaultPartitioner.selectChannels(streamRecord, 1).length);
+ assertEquals(1,
+ defaultPartitioner.selectChannels(streamRecord, 2).length);
+ assertEquals(1,
+ defaultPartitioner.selectChannels(streamRecord, 1024).length);
}
@Test
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/FieldsPartitionerTest.java
index 20c8c45d6ab15e72021e7184f83fcdaa8a9b017c..dd2c42d2cdfd6a3c35fe7daf8c4c0a2ca366bc6d 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/FieldsPartitionerTest.java
@@ -1,18 +1,3 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/GlobalPartitionerTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/GlobalPartitionerTest.java
index 2aad1f4e9d6387555a35a2979571ee2640e3fa46..d66629f37c3b9039e59b7c8849755de730820b7c 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/GlobalPartitionerTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/GlobalPartitionerTest.java
@@ -1,18 +1,3 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/ShufflePartitionerTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/ShufflePartitionerTest.java
index e105384f71bb8e9386a80fb2fdf10d7edbf87529..130b804b4f9a1e27cc897091ddf93bbc94388815 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/ShufflePartitionerTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/partitioner/ShufflePartitionerTest.java
@@ -1,18 +1,3 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertEquals;