提交 794e2804 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] FaultTolerance set via JobGraphBuilder config

上级 d15a8f04
......@@ -78,11 +78,6 @@
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
......
......@@ -41,6 +41,7 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......@@ -58,14 +59,14 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
*
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName) {
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
......@@ -73,6 +74,7 @@ public class JobGraphBuilder {
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
/**
......@@ -187,6 +189,9 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
......
......@@ -34,10 +34,12 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......@@ -51,6 +53,37 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return numComponents;
}
public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type,
Configuration config, List<RecordWriter<StreamRecord>> outputs, int taskInstanceID,
String name, int[] numberOfOutputChannels) {
type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0));
RecordInvoker invoker = getRecordInvoker(type);
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
util = new FaultToleranceUtil(type, outputs, taskInstanceID, name,
numberOfOutputChannels);
break;
case NONE:
default:
util = null;
break;
}
return invoker;
}
public RecordInvoker getRecordInvoker(FaultToleranceType type) {
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
return new InvokerWithFaultTolerance();
case NONE:
default:
return new Invoker();
}
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
......@@ -75,22 +108,25 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration, List<StreamRecordReader<StreamRecord>> inputs)
throws StreamComponentException {
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<StreamRecordReader<StreamRecord>> inputs) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase, StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase, StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
}
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration, List<RecordWriter<StreamRecord>> outputs,
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
......@@ -98,10 +134,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase, StreamRecord.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase, StreamRecord.class,
outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
......@@ -109,9 +146,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
......@@ -122,21 +158,28 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
// TODO consider logging stack trace!
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name, FaultToleranceUtil recordBuffer) {
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name,
FaultToleranceUtil recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultTaskInvokable.class, StreamInvokableComponent.class);
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceBuffer", 0));
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
userFunction
.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
// log.error(e.getStackTrace());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
// log.error(e.getStackTrace());
}
return userFunction;
}
......@@ -158,27 +201,29 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass("partitionerClass_"
+ nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
partitioners.add(partitioner.newInstance());
}
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput + " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
} catch (Exception e) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs", e);
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs", e);
}
}
public void invokeRecords(RecordInvokable userFunction, List<StreamRecordReader<StreamRecord>> inputs, String name)
throws Exception {
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
while (hasInput) {
......@@ -186,13 +231,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
StreamRecord record = input.next();
// UID id = record.getId();
userFunction.invoke(record);
// threadSafePublish(new AckEvent(id), input);
// log.debug("ACK: " + id + " -- " + name);
}
else if (input.isInputClosed()) {
invoker.call(name, userFunction, input);
} else if (input.isInputClosed()) {
closedInputs.add(input);
}
}
......@@ -200,4 +240,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
}
\ No newline at end of file
......@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -35,36 +36,47 @@ public abstract class StreamInvokableComponent {
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords) {
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void emit(StreamRecord record) {
record.setId(channelID);
// emittedRecords.addRecord(record);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
emittedRecords.failRecord(record.getId());
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
// TODO: Add fault tolerance
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
// emittedRecords.addRecord(record, outputChannel);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
......
......@@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
......@@ -34,6 +36,7 @@ public class StreamSink extends AbstractOutputTask {
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
private RecordInvoker invoker;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -52,13 +55,17 @@ public class StreamSink extends AbstractOutputTask {
} catch (Exception e) {
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(userFunction, inputs, name);
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
......
......@@ -26,8 +26,10 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
......@@ -41,6 +43,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
private int sourceInstanceID;
private String name;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
......@@ -49,7 +53,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
streamSourceHelper = new StreamComponentHelper<StreamSource>();
numSources=StreamComponentHelper.newComponent();
numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
}
......@@ -69,20 +73,23 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
log.error("Cannot register outputs", e);
}
int[] numberOfOutputChannels= new int[outputs.size()];
for(int i=0; i<numberOfOutputChannels.length;i++ ){
numberOfOutputChannels[i]=taskConfiguration.getInteger("channels_"+i, 0);
int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, name, recordBuffer);
// recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name,
// numberOfOutputChannels);
invoker = streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
}
......@@ -93,7 +100,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
userFunction.invoke();
// TODO print to file
System.out.println(userFunction.getResult());
}
}
......@@ -26,7 +26,9 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
......@@ -40,7 +42,9 @@ public class StreamTask extends AbstractTask {
private static int numTasks;
private int taskInstanceID;
private String name;
StreamComponentHelper<StreamTask> streamTaskHelper;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
......@@ -73,7 +77,9 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
......@@ -84,10 +90,10 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(userFunction, inputs, name);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology {
......@@ -69,7 +70,7 @@ public class BasicTopology {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE);
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchForwardLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class CellInfoLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
......
......@@ -29,6 +29,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
......@@ -144,7 +145,7 @@ public class IncrementalLearningSkeleton {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning", FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -33,6 +33,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
......@@ -46,7 +47,7 @@ public class IncrementalOLS {
@Override
public void invoke() throws Exception {
record.initRecords();
while(true) {
for (int j = 0; j < 100; j++) {
// pull new record from data source
record.setTuple(getNewData());
emit(record);
......@@ -63,7 +64,7 @@ public class IncrementalOLS {
public static class TrainingDataSource extends UserSourceInvokable {
private final int BATCH_SIZE = 1000;
private final int BATCH_SIZE = 10;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
......@@ -74,7 +75,7 @@ public class IncrementalOLS {
record.initRecords();
while(true) {
for (int j = 0; j < 1000; j++) {
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
......@@ -165,11 +166,12 @@ public class IncrementalOLS {
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getTuple());
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS");
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS", FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -24,13 +24,14 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......
......@@ -34,6 +34,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -141,7 +142,7 @@ public class WordCountRemote {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
......
......@@ -24,6 +24,7 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
......@@ -31,7 +32,7 @@ public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
......
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.Map;
public enum FaultToleranceType {
NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2);
public final int id;
FaultToleranceType(int id) {
this.id = id;
}
private static final Map<Integer, FaultToleranceType> map = new HashMap<Integer, FaultToleranceType>();
static {
for (FaultToleranceType type : FaultToleranceType.values())
map.put(type.id, type);
}
public static FaultToleranceType from(int id) {
return map.get(id);
}
}
\ No newline at end of file
......@@ -39,10 +39,9 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
public PerformanceCounter counter;
......@@ -57,20 +56,22 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
// TODO:get faulttolerancy type from user config, update logs for channel
// TODO:update logs for channel
// acks and fails
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = true;
if (exactlyOnce) {
this.type = type;
switch (type) {
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -80,18 +81,19 @@ public class FaultToleranceUtil {
}
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
switch (type) {
case AT_LEAST_ONCE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
......@@ -149,7 +151,7 @@ public class FaultToleranceUtil {
*/
public void failRecord(UID recordID, int channel) {
// if by ft type
if (exactlyOnce) {
if (type == FaultToleranceType.EXACTLY_ONCE) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
......
package eu.stratosphere.streaming.rabbitmq;
import java.net.InetSocketAddress;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class RMQTopology {
public static class RMQSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>());
@Override
public void invoke() throws Exception {
String QUEUE_NAME = "hello";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
if(message.equals("quit")){
break;
}
record.setString(0, message);
emit(record);
}
connection.close();
}
}
public static class Sink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record);
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ");
graphBuilder.setSource("Source", RMQSource.class, 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} catch (Exception e) {
}
}
}
......@@ -21,11 +21,7 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -40,11 +36,12 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
private static boolean fPTest = true;
public static class MySource extends UserSourceInvokable {
public MySource() {
......@@ -54,7 +51,7 @@ public class StreamComponentTest {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 100; i++) {
record.setField(0, i);
emit(record);
}
......@@ -92,12 +89,9 @@ public class StreamComponentTest {
@BeforeClass
public static void runStream() {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
root.addAppender(new ConsoleAppender());
root.setLevel(Level.OFF);
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2, 2);
graphBuilder.setSink("MySink", MySink.class);
......@@ -123,10 +117,7 @@ public class StreamComponentTest {
@Test
public void test() {
Assert.assertTrue(fPTest);
assertEquals(1000, data.keySet().size());
assertEquals(100, data.keySet().size());
for (Integer k : data.keySet()) {
assertEquals((Integer) (k + 1), data.get(k));
......
package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*;
import org.junit.Test;
public class FaultToleranceTypeTest {
@Test
public void test() {
assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0));
assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1));
assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2));
}
}
......@@ -33,7 +33,7 @@ public class FaultToleranceUtilTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
int[] numOfOutputchannels = { 1, 2 };
faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels);
faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册