From f660d0583ac63ce7e4202bab45315677d8150075 Mon Sep 17 00:00:00 2001 From: gaborhermann Date: Mon, 14 Jul 2014 16:28:56 +0200 Subject: [PATCH] [streaming] Implemented thread safe event publishing --- .../streaming/api/FaultTolerancyBuffer.java | 2 +- ...actory.java => StreamComponentHelper.java} | 114 +++++++++--------- .../api/streamcomponent/StreamSink.java | 36 ++---- .../api/streamcomponent/StreamSource.java | 21 ++-- .../api/streamcomponent/StreamTask.java | 47 +++----- 5 files changed, 97 insertions(+), 123 deletions(-) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/{StreamComponentFactory.java => StreamComponentHelper.java} (62%) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java index 73f81a8d68d..8667fa11991 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java @@ -15,7 +15,6 @@ import eu.stratosphere.types.Record; public class FaultTolerancyBuffer { private final static long TIMEOUT = 1000; - private final static long TIMESTAMP_STORE_WINDOW = TIMEOUT / 10; private Long timeOfLastUpdate; private Map recordBuffer; @@ -47,6 +46,7 @@ public class FaultTolerancyBuffer { addTimestamp(streamRecord.getId()); } + //TODO: use this method! private void timeoutRecords() { Long currentTime = System.currentTimeMillis(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentFactory.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java similarity index 62% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentFactory.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 5ff81ad9ad4..042ef7710bb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentFactory.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -1,12 +1,15 @@ package eu.stratosphere.streaming.api.streamcomponent; import java.util.List; +import java.util.Random; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.event.task.AbstractTaskEvent; import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.streaming.api.AckEvent; import eu.stratosphere.streaming.api.AckEventListener; import eu.stratosphere.streaming.api.FailEvent; @@ -22,9 +25,10 @@ import eu.stratosphere.types.Key; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; -public final class StreamComponentFactory { - - public static void setAckListener(FaultTolerancyBuffer recordBuffer, +public final class StreamComponentHelper { + private Random random = new Random(); + + public void setAckListener(FaultTolerancyBuffer recordBuffer, String sourceInstanceID, List> outputs) { EventListener eventListener = new AckEventListener(sourceInstanceID, recordBuffer); @@ -33,8 +37,8 @@ public final class StreamComponentFactory { output.subscribeToEvent(eventListener, AckEvent.class); } } - - public static void setFailListener(FaultTolerancyBuffer recordBuffer, + + public void setFailListener(FaultTolerancyBuffer recordBuffer, String sourceInstanceID, List> outputs) { EventListener eventListener = new FailEventListener(sourceInstanceID, recordBuffer); @@ -44,73 +48,52 @@ public final class StreamComponentFactory { } } - // for StreamTask - public static int setConfigInputs(StreamTask taskBase, - Configuration taskConfiguration, List> inputs) { - int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); - for (int i = 0; i < numberOfInputs; i++) { - inputs.add(new RecordReader(taskBase, Record.class)); - } - return numberOfInputs; - } - - // this function can be removed as duplication of the above function if - // modification on kernel is allowed. - // for StreamSink - public static int setConfigInputs(StreamSink taskBase, - Configuration taskConfiguration, List> inputs) { + public void setConfigInputs(T taskBase, Configuration taskConfiguration, + List> inputs) throws Exception { int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); for (int i = 0; i < numberOfInputs; i++) { - inputs.add(new RecordReader(taskBase, Record.class)); + if (taskBase instanceof StreamTask) { + inputs.add(new RecordReader((StreamTask) taskBase, + Record.class)); + } else if (taskBase instanceof StreamSink) { + inputs.add(new RecordReader((StreamSink) taskBase, + Record.class)); + } else { + throw new Exception( + "Nonsupported object passed to setConfigInputs"); + } } - return numberOfInputs; } - // for StreamTask - public static int setConfigOutputs(StreamTask taskBase, - Configuration taskConfiguration, + public void setConfigOutputs(T taskBase, Configuration taskConfiguration, List> outputs, - List> partitioners) { + List> partitioners) throws Exception { int numberOfOutputs = taskConfiguration .getInteger("numberOfOutputs", 0); for (int i = 1; i <= numberOfOutputs; i++) { - StreamComponentFactory.setPartitioner(taskConfiguration, i, - partitioners); + setPartitioner(taskConfiguration, i, partitioners); } for (ChannelSelector outputPartitioner : partitioners) { - outputs.add(new RecordWriter(taskBase, Record.class, - outputPartitioner)); + if (taskBase instanceof StreamTask) { + outputs.add(new RecordWriter((StreamTask) taskBase, + Record.class, outputPartitioner)); + } else if (taskBase instanceof StreamSource) { + outputs.add(new RecordWriter((StreamSource) taskBase, + Record.class, outputPartitioner)); + } else { + throw new Exception( + "Nonsupported object passed to setConfigOutputs"); + } } - return numberOfOutputs; } - // this function can be removed as duplication of the above function if - // modification on kernel is allowed. - // for StreamSource - public static int setConfigOutputs(StreamSource taskBase, - Configuration taskConfiguration, - List> outputs, - List> partitioners) { - int numberOfOutputs = taskConfiguration - .getInteger("numberOfOutputs", 0); - for (int i = 1; i <= numberOfOutputs; i++) { - StreamComponentFactory.setPartitioner(taskConfiguration, i, - partitioners); - } - for (ChannelSelector outputPartitioner : partitioners) { - outputs.add(new RecordWriter(taskBase, Record.class, - outputPartitioner)); - } - return numberOfOutputs; - } + public UserSinkInvokable getUserFunction(Configuration taskConfiguration) { - public static UserSinkInvokable setUserFunction(Configuration taskConfiguration) { - Class userFunctionClass = taskConfiguration .getClass("userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class); UserSinkInvokable userFunction = null; - + try { userFunction = userFunctionClass.newInstance(); } catch (Exception e) { @@ -118,13 +101,12 @@ public final class StreamComponentFactory { } return userFunction; } - - public static StreamInvokable setUserFunction( - Configuration taskConfiguration, + + public StreamInvokable getUserFunction(Configuration taskConfiguration, List> outputs, String instanceID, FaultTolerancyBuffer recordBuffer) { - //Default value is a TaskInvokable even if it was called from a source + // Default value is a TaskInvokable even if it was called from a source Class userFunctionClass = taskConfiguration .getClass("userfunction", DefaultTaskInvokable.class, StreamInvokable.class); @@ -139,8 +121,21 @@ public final class StreamComponentFactory { return userFunction; } - public static void setPartitioner(Configuration taskConfiguration, - int nrOutput, List> partitioners) { + //TODO: use TCP-like waiting + public void threadSafePublish (AbstractTaskEvent e, RecordReader input) throws InterruptedException { + boolean concurrentModificationOccured = false; + while (!concurrentModificationOccured) { + try { + input.publishEvent(e); + concurrentModificationOccured = true; + } catch (Exception exeption) { + Thread.sleep(random.nextInt(50)); + } + } + } + + private void setPartitioner(Configuration taskConfiguration, int nrOutput, + List> partitioners) { Class> partitioner = taskConfiguration .getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class); @@ -165,4 +160,5 @@ public final class StreamComponentFactory { System.out.println(e); } } + } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java index 44dd3ae0da7..684b635f8ca 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java @@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent; import java.util.LinkedList; import java.util.List; -import java.util.Random; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.io.RecordReader; @@ -32,20 +31,25 @@ public class StreamSink extends AbstractOutputTask { private List> inputs; private UserSinkInvokable userFunction; - - private Random rnd = new Random(); + private StreamComponentHelper streamSinkHelper; public StreamSink() { // TODO: Make configuration file visible and call setClassInputs() here inputs = new LinkedList>(); userFunction = null; + streamSinkHelper = new StreamComponentHelper(); } @Override public void registerInputOutput() { Configuration taskConfiguration = getTaskConfiguration(); - StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs); - userFunction = StreamComponentFactory.setUserFunction(taskConfiguration); + + try { + streamSinkHelper.setConfigInputs(this, taskConfiguration, inputs); + } catch (Exception e) { + e.printStackTrace(); + } + userFunction = streamSinkHelper.getUserFunction(taskConfiguration); } @Override @@ -60,26 +64,10 @@ public class StreamSink extends AbstractOutputTask { String id = rec.getId(); try { userFunction.invoke(rec.getRecord()); - //TODO create concurrent publish method - boolean concurrentModificationOccured = false; - while (!concurrentModificationOccured) { - try { - input.publishEvent(new AckEvent(id)); - concurrentModificationOccured = true; - } catch (Exception f) { - Thread.sleep(rnd.nextInt(50)); - } - } + streamSinkHelper.threadSafePublish(new AckEvent(id), input); } catch (Exception e) { - boolean concurrentModificationOccured = false; - while (!concurrentModificationOccured) { - try { - input.publishEvent(new FailEvent(id)); - concurrentModificationOccured = true; - } catch (Exception f) { - Thread.sleep(rnd.nextInt(50)); - } - } + streamSinkHelper.threadSafePublish(new FailEvent(id), input); + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java index 6a213c3e1df..1f51d674d7f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java @@ -59,15 +59,22 @@ public class StreamSource extends AbstractInputTask { @Override public void registerInputOutput() { Configuration taskConfiguration = getTaskConfiguration(); - StreamComponentFactory.setConfigOutputs(this, taskConfiguration, - outputs, partitioners); - recordBuffer = new FaultTolerancyBuffer(outputs,sourceInstanceID); - userFunction = (UserSourceInvokable) StreamComponentFactory - .setUserFunction(taskConfiguration, outputs, sourceInstanceID, + StreamComponentHelper streamSourceHelper = new StreamComponentHelper(); + + try { + streamSourceHelper.setConfigOutputs(this, taskConfiguration, + outputs, partitioners); + } catch (Exception e) { + e.printStackTrace(); + } + + recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID); + userFunction = (UserSourceInvokable) streamSourceHelper + .getUserFunction(taskConfiguration, outputs, sourceInstanceID, recordBuffer); - StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID, + streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs); - StreamComponentFactory.setFailListener(recordBuffer, sourceInstanceID, + streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java index 8e7773a3e88..1d8c2b6be00 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java @@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent; import java.util.LinkedList; import java.util.List; -import java.util.Random; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.io.ChannelSelector; @@ -39,8 +38,7 @@ public class StreamTask extends AbstractTask { private UserTaskInvokable userFunction; private static int numTasks = 0; private String taskInstanceID = ""; - - private Random rnd = new Random(); + StreamComponentHelper streamTaskHelper; private FaultTolerancyBuffer recordBuffer; @@ -52,24 +50,26 @@ public class StreamTask extends AbstractTask { userFunction = null; numTasks++; taskInstanceID = Integer.toString(numTasks); - + streamTaskHelper = new StreamComponentHelper(); } @Override public void registerInputOutput() { Configuration taskConfiguration = getTaskConfiguration(); - StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs); - StreamComponentFactory.setConfigOutputs(this, taskConfiguration, outputs, - partitioners); + try { + streamTaskHelper.setConfigInputs(this, taskConfiguration, inputs); + streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs, + partitioners); + } catch (Exception e) { + e.printStackTrace(); + } recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID); - userFunction = (UserTaskInvokable) StreamComponentFactory.setUserFunction( + userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction( taskConfiguration, outputs, taskInstanceID, recordBuffer); - StreamComponentFactory - .setAckListener(recordBuffer, taskInstanceID, outputs); - StreamComponentFactory.setFailListener(recordBuffer, taskInstanceID, - outputs); + streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs); + streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs); } @Override @@ -82,29 +82,12 @@ public class StreamTask extends AbstractTask { hasInput = true; StreamRecord streamRecord = new StreamRecord(input.next()); String id = streamRecord.getId(); - //TODO create method for concurrent publishing + // TODO create method for concurrent publishing try { userFunction.invoke(streamRecord.getRecord()); - - boolean concurrentModificationOccured = false; - while (!concurrentModificationOccured) { - try { - input.publishEvent(new AckEvent(id)); - concurrentModificationOccured = true; - } catch (Exception e) { - Thread.sleep(rnd.nextInt(50)); - } - } + streamTaskHelper.threadSafePublish(new AckEvent(id), input); } catch (Exception e) { - boolean concurrentModificationOccured = false; - while (!concurrentModificationOccured) { - try { - input.publishEvent(new FailEvent(id)); - concurrentModificationOccured = true; - } catch (Exception f) { - Thread.sleep(rnd.nextInt(50)); - } - } + streamTaskHelper.threadSafePublish(new FailEvent(id), input); } } } -- GitLab