提交 38638c8c 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] enabled exactly once processing for fault tolerance

上级 e267208b
......@@ -36,6 +36,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.RecordSizeMismatchException;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......@@ -46,55 +47,55 @@ import eu.stratosphere.types.StringValue;
public final class StreamComponentHelper<T extends AbstractInvokable> {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
private static int numComponents=0;
public static int newComponent(){
private static int numComponents = 0;
public static int newComponent() {
numComponents++;
return numComponents;
}
public void setAckListener(FaultToleranceUtil recordBuffer,
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new eu.stratosphere.streaming.faulttolerance.AckEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<StreamRecord> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
public void setAckListener(FaultToleranceUtil recordBuffer, String sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
EventListener[] ackListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
ackListeners[i] = new AckEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(ackListeners[i], AckEvent.class);
}
}
public void setFailListener(FaultToleranceUtil recordBuffer,
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<StreamRecord> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, FailEvent.class);
public void setFailListener(FaultToleranceUtil recordBuffer, String sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
EventListener[] failListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
failListeners[i] = new FailEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(failListeners[i], FailEvent.class);
}
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<RecordReader<StreamRecord>> inputs) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new RecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
inputs.add(new RecordReader<StreamRecord>((StreamTask) taskBase, StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new RecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
inputs.add(new RecordReader<StreamRecord>((StreamSink) taskBase, StreamRecord.class));
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigInputs");
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
}
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners)
throws StreamComponentException {
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
......@@ -107,56 +108,48 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigOutputs");
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
}
}
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
return userFunction;
}
public StreamInvokableComponent getUserFunction(
Configuration taskConfiguration,
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, String instanceID, String name,
FaultToleranceUtil recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
StreamInvokableComponent.class);
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: "
+ userFunctionClass.getSimpleName());
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
return userFunction;
}
// TODO find a better solution for this
public void threadSafePublish(AbstractTaskEvent event,
RecordReader<StreamRecord> input) throws InterruptedException,
IOException {
public void threadSafePublish(AbstractTaskEvent event, RecordReader<StreamRecord> input)
throws InterruptedException, IOException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
......@@ -171,33 +164,33 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass("partitionerClassParam_"
+ nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
partitioners.add(partitioner.getConstructor(int.class, Class.class).newInstance(
keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
} catch (Exception e) {
log.error(
"Error while setting partitioner: " + partitioner.getSimpleName()
+ " with " + nrOutput + " outputs", e);
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs", e);
}
}
public void invokeRecords(RecordInvokable userFunction, List<RecordReader<StreamRecord>> inputs, String name) throws IOException, InterruptedException {
public void invokeRecords(RecordInvokable userFunction,
List<RecordReader<StreamRecord>> inputs, String name) throws IOException,
InterruptedException {
boolean hasInput = true;
while (hasInput) {
hasInput = false;
......@@ -210,13 +203,14 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
//TODO: write an exception class to throw forward
// TODO: write an exception class to throw forward
} catch (RecordSizeMismatchException e) {
throw (e);
} catch (Exception e) {
e.printStackTrace();
threadSafePublish(new FailEvent(id), input);
log.warn("FAILED: " + id + " -- " + name + " -- due to " + e.getClass().getSimpleName());
log.warn("FAILED: " + id + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
}
......
......@@ -31,6 +31,7 @@ public class AckEventListener implements EventListener {
private String taskInstanceID;
private FaultToleranceUtil recordBuffer;
private int output;
/**
* Creates an AckEventListener that monitors AckEvents sent to task with the
......@@ -41,9 +42,10 @@ public class AckEventListener implements EventListener {
* @param recordBuffer
* The fault tolerance buffer associated with this task
*/
public AckEventListener(String taskInstanceID, FaultToleranceUtil recordBuffer) {
public AckEventListener(String taskInstanceID, FaultToleranceUtil recordBuffer, int output) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
this.output=output;
}
/**
......@@ -57,9 +59,9 @@ public class AckEventListener implements EventListener {
if (ackChannelId.equals(taskInstanceID)) {
Long nt = System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId(),0);
recordBuffer.ackRecord(ackEvent.getRecordId(),output);
log.debug("ACK PROCESSED: " + ackEvent.getRecordId() + " exec. time (ns): " + (System.nanoTime() - nt));
log.debug("ACK PROCESSED: "+output+" " + ackEvent.getRecordId() + " exec. time (ns): " + (System.nanoTime() - nt));
}
}
}
......@@ -31,6 +31,7 @@ public class FailEventListener implements EventListener {
private String taskInstanceID;
private FaultToleranceUtil recordBuffer;
private int output;
/**
* Creates a FailEventListener that monitors FailEvents sent to task with
......@@ -41,9 +42,10 @@ public class FailEventListener implements EventListener {
* @param recordBuffer
* The fault tolerance buffer associated with this task
*/
public FailEventListener(String taskInstanceID, FaultToleranceUtil recordBuffer) {
public FailEventListener(String taskInstanceID, FaultToleranceUtil recordBuffer, int output) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
this.output = output;
}
/**
......@@ -56,9 +58,8 @@ public class FailEventListener implements EventListener {
String recordId = failEvent.getRecordId();
String failCID = recordId.split("-", 2)[0];
if (failCID.equals(taskInstanceID)) {
recordBuffer.failRecord(recordId,0);
// log.warn("FAIL RECIEVED: " + recordId + " containing: " + recordBuffer.getRecordBuffer());
//log.trace(recordBuffer.getRecordBuffer());
recordBuffer.failRecord(recordId, output);
log.warn("FAIL RECIEVED: "+output +" "+ recordId);
}
}
......
......@@ -15,15 +15,7 @@
package eu.stratosphere.streaming.faulttolerance;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -44,7 +36,8 @@ public class FaultToleranceUtil {
private final String componentID;
private int numberOfChannels;
private int[] numberOfOutputChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
......@@ -59,14 +52,20 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
//TODO:get faulttolerancy type from user config, update logs for channel acks and fails
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, String componentID,
int[] numberOfChannels) {
this.outputs = outputs;
this.numberOfOutputChannels = numberOfChannels;
this.componentID = componentID;
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, componentID);
exactlyOnce = true;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, componentID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, componentID);
}
}
......@@ -109,15 +108,14 @@ public class FaultToleranceUtil {
*/
public void failRecord(String recordID, int channel) {
// if by ft type
if (true) {
failRecord(recordID);
} else {
if (exactlyOnce) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
reEmit(failed, channel);
}
} else {
failRecord(recordID);
}
}
......@@ -179,7 +177,6 @@ public class FaultToleranceUtil {
}
public List<RecordWriter<StreamRecord>> getOutputs() {
return this.outputs;
}
......
......@@ -29,7 +29,7 @@ public class WordCountDummySource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
} else {
......
......@@ -45,7 +45,7 @@ public class WordCountLocal {
//TODO: arguments check
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册