提交 f660d058 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Implemented thread safe event publishing

上级 b1071d8e
...@@ -15,7 +15,6 @@ import eu.stratosphere.types.Record; ...@@ -15,7 +15,6 @@ import eu.stratosphere.types.Record;
public class FaultTolerancyBuffer { public class FaultTolerancyBuffer {
private final static long TIMEOUT = 1000; private final static long TIMEOUT = 1000;
private final static long TIMESTAMP_STORE_WINDOW = TIMEOUT / 10;
private Long timeOfLastUpdate; private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer; private Map<String, StreamRecord> recordBuffer;
...@@ -47,6 +46,7 @@ public class FaultTolerancyBuffer { ...@@ -47,6 +46,7 @@ public class FaultTolerancyBuffer {
addTimestamp(streamRecord.getId()); addTimestamp(streamRecord.getId());
} }
//TODO: use this method!
private void timeoutRecords() { private void timeoutRecords() {
Long currentTime = System.currentTimeMillis(); Long currentTime = System.currentTimeMillis();
......
package eu.stratosphere.streaming.api.streamcomponent; package eu.stratosphere.streaming.api.streamcomponent;
import java.util.List; import java.util.List;
import java.util.Random;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.streaming.api.AckEvent; import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.AckEventListener; import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent; import eu.stratosphere.streaming.api.FailEvent;
...@@ -22,9 +25,10 @@ import eu.stratosphere.types.Key; ...@@ -22,9 +25,10 @@ import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
public final class StreamComponentFactory { public final class StreamComponentHelper<T extends AbstractInvokable> {
private Random random = new Random();
public static void setAckListener(FaultTolerancyBuffer recordBuffer,
public void setAckListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) { String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID, EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer); recordBuffer);
...@@ -33,8 +37,8 @@ public final class StreamComponentFactory { ...@@ -33,8 +37,8 @@ public final class StreamComponentFactory {
output.subscribeToEvent(eventListener, AckEvent.class); output.subscribeToEvent(eventListener, AckEvent.class);
} }
} }
public static void setFailListener(FaultTolerancyBuffer recordBuffer, public void setFailListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) { String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID, EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer); recordBuffer);
...@@ -44,73 +48,52 @@ public final class StreamComponentFactory { ...@@ -44,73 +48,52 @@ public final class StreamComponentFactory {
} }
} }
// for StreamTask public void setConfigInputs(T taskBase, Configuration taskConfiguration,
public static int setConfigInputs(StreamTask taskBase, List<RecordReader<Record>> inputs) throws Exception {
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(taskBase, Record.class));
}
return numberOfInputs;
}
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
// for StreamSink
public static int setConfigInputs(StreamSink taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) { for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(taskBase, Record.class)); if (taskBase instanceof StreamTask) {
inputs.add(new RecordReader<Record>((StreamTask) taskBase,
Record.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new RecordReader<Record>((StreamSink) taskBase,
Record.class));
} else {
throw new Exception(
"Nonsupported object passed to setConfigInputs");
}
} }
return numberOfInputs;
} }
// for StreamTask public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) { List<ChannelSelector<Record>> partitioners) throws Exception {
int numberOfOutputs = taskConfiguration int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0); .getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) { for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i, setPartitioner(taskConfiguration, i, partitioners);
partitioners);
} }
for (ChannelSelector<Record> outputPartitioner : partitioners) { for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class, if (taskBase instanceof StreamTask) {
outputPartitioner)); outputs.add(new RecordWriter<Record>((StreamTask) taskBase,
Record.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<Record>((StreamSource) taskBase,
Record.class, outputPartitioner));
} else {
throw new Exception(
"Nonsupported object passed to setConfigOutputs");
}
} }
return numberOfOutputs;
} }
// this function can be removed as duplication of the above function if public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
// modification on kernel is allowed.
// for StreamSource
public static int setConfigOutputs(StreamSource taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
outputPartitioner));
}
return numberOfOutputs;
}
public static UserSinkInvokable setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class, .getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class); UserSinkInvokable.class);
UserSinkInvokable userFunction = null; UserSinkInvokable userFunction = null;
try { try {
userFunction = userFunctionClass.newInstance(); userFunction = userFunctionClass.newInstance();
} catch (Exception e) { } catch (Exception e) {
...@@ -118,13 +101,12 @@ public final class StreamComponentFactory { ...@@ -118,13 +101,12 @@ public final class StreamComponentFactory {
} }
return userFunction; return userFunction;
} }
public static StreamInvokable setUserFunction( public StreamInvokable getUserFunction(Configuration taskConfiguration,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID, List<RecordWriter<Record>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) { FaultTolerancyBuffer recordBuffer) {
//Default value is a TaskInvokable even if it was called from a source // Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokable> userFunctionClass = taskConfiguration Class<? extends StreamInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class, .getClass("userfunction", DefaultTaskInvokable.class,
StreamInvokable.class); StreamInvokable.class);
...@@ -139,8 +121,21 @@ public final class StreamComponentFactory { ...@@ -139,8 +121,21 @@ public final class StreamComponentFactory {
return userFunction; return userFunction;
} }
public static void setPartitioner(Configuration taskConfiguration, //TODO: use TCP-like waiting
int nrOutput, List<ChannelSelector<Record>> partitioners) { public void threadSafePublish (AbstractTaskEvent e, RecordReader<Record> input) throws InterruptedException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
input.publishEvent(e);
concurrentModificationOccured = true;
} catch (Exception exeption) {
Thread.sleep(random.nextInt(50));
}
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, .getClass("partitionerClass_" + nrOutput,
DefaultPartitioner.class, ChannelSelector.class); DefaultPartitioner.class, ChannelSelector.class);
...@@ -165,4 +160,5 @@ public final class StreamComponentFactory { ...@@ -165,4 +160,5 @@ public final class StreamComponentFactory {
System.out.println(e); System.out.println(e);
} }
} }
} }
\ No newline at end of file
...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
...@@ -32,20 +31,25 @@ public class StreamSink extends AbstractOutputTask { ...@@ -32,20 +31,25 @@ public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs; private List<RecordReader<Record>> inputs;
private UserSinkInvokable userFunction; private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private Random rnd = new Random();
public StreamSink() { public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here // TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>(); inputs = new LinkedList<RecordReader<Record>>();
userFunction = null; userFunction = null;
streamSinkHelper = new StreamComponentHelper<StreamSink>();
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration(); Configuration taskConfiguration = getTaskConfiguration();
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs);
userFunction = StreamComponentFactory.setUserFunction(taskConfiguration); try {
streamSinkHelper.setConfigInputs(this, taskConfiguration, inputs);
} catch (Exception e) {
e.printStackTrace();
}
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
} }
@Override @Override
...@@ -60,26 +64,10 @@ public class StreamSink extends AbstractOutputTask { ...@@ -60,26 +64,10 @@ public class StreamSink extends AbstractOutputTask {
String id = rec.getId(); String id = rec.getId();
try { try {
userFunction.invoke(rec.getRecord()); userFunction.invoke(rec.getRecord());
//TODO create concurrent publish method streamSinkHelper.threadSafePublish(new AckEvent(id), input);
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
input.publishEvent(new AckEvent(id));
concurrentModificationOccured = true;
} catch (Exception f) {
Thread.sleep(rnd.nextInt(50));
}
}
} catch (Exception e) { } catch (Exception e) {
boolean concurrentModificationOccured = false; streamSinkHelper.threadSafePublish(new FailEvent(id), input);
while (!concurrentModificationOccured) {
try {
input.publishEvent(new FailEvent(id));
concurrentModificationOccured = true;
} catch (Exception f) {
Thread.sleep(rnd.nextInt(50));
}
}
} }
} }
......
...@@ -59,15 +59,22 @@ public class StreamSource extends AbstractInputTask<RandIS> { ...@@ -59,15 +59,22 @@ public class StreamSource extends AbstractInputTask<RandIS> {
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration(); Configuration taskConfiguration = getTaskConfiguration();
StreamComponentFactory.setConfigOutputs(this, taskConfiguration, StreamComponentHelper<StreamSource> streamSourceHelper = new StreamComponentHelper<StreamSource>();
outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs,sourceInstanceID); try {
userFunction = (UserSourceInvokable) StreamComponentFactory streamSourceHelper.setConfigOutputs(this, taskConfiguration,
.setUserFunction(taskConfiguration, outputs, sourceInstanceID, outputs, partitioners);
} catch (Exception e) {
e.printStackTrace();
}
recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID);
userFunction = (UserSourceInvokable) streamSourceHelper
.getUserFunction(taskConfiguration, outputs, sourceInstanceID,
recordBuffer); recordBuffer);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID, streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID,
outputs); outputs);
StreamComponentFactory.setFailListener(recordBuffer, sourceInstanceID, streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID,
outputs); outputs);
} }
......
...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
...@@ -39,8 +38,7 @@ public class StreamTask extends AbstractTask { ...@@ -39,8 +38,7 @@ public class StreamTask extends AbstractTask {
private UserTaskInvokable userFunction; private UserTaskInvokable userFunction;
private static int numTasks = 0; private static int numTasks = 0;
private String taskInstanceID = ""; private String taskInstanceID = "";
StreamComponentHelper<StreamTask> streamTaskHelper;
private Random rnd = new Random();
private FaultTolerancyBuffer recordBuffer; private FaultTolerancyBuffer recordBuffer;
...@@ -52,24 +50,26 @@ public class StreamTask extends AbstractTask { ...@@ -52,24 +50,26 @@ public class StreamTask extends AbstractTask {
userFunction = null; userFunction = null;
numTasks++; numTasks++;
taskInstanceID = Integer.toString(numTasks); taskInstanceID = Integer.toString(numTasks);
streamTaskHelper = new StreamComponentHelper<StreamTask>();
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration(); Configuration taskConfiguration = getTaskConfiguration();
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs); try {
StreamComponentFactory.setConfigOutputs(this, taskConfiguration, outputs, streamTaskHelper.setConfigInputs(this, taskConfiguration, inputs);
partitioners); streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
} catch (Exception e) {
e.printStackTrace();
}
recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID); recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
userFunction = (UserTaskInvokable) StreamComponentFactory.setUserFunction( userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(
taskConfiguration, outputs, taskInstanceID, recordBuffer); taskConfiguration, outputs, taskInstanceID, recordBuffer);
StreamComponentFactory streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
.setAckListener(recordBuffer, taskInstanceID, outputs); streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
StreamComponentFactory.setFailListener(recordBuffer, taskInstanceID,
outputs);
} }
@Override @Override
...@@ -82,29 +82,12 @@ public class StreamTask extends AbstractTask { ...@@ -82,29 +82,12 @@ public class StreamTask extends AbstractTask {
hasInput = true; hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next()); StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.getId(); String id = streamRecord.getId();
//TODO create method for concurrent publishing // TODO create method for concurrent publishing
try { try {
userFunction.invoke(streamRecord.getRecord()); userFunction.invoke(streamRecord.getRecord());
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
input.publishEvent(new AckEvent(id));
concurrentModificationOccured = true;
} catch (Exception e) {
Thread.sleep(rnd.nextInt(50));
}
}
} catch (Exception e) { } catch (Exception e) {
boolean concurrentModificationOccured = false; streamTaskHelper.threadSafePublish(new FailEvent(id), input);
while (!concurrentModificationOccured) {
try {
input.publishEvent(new FailEvent(id));
concurrentModificationOccured = true;
} catch (Exception f) {
Thread.sleep(rnd.nextInt(50));
}
}
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册