diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml index 639f28a301a6ec814bfa4b14eb428c818f9bda7d..127ada1e396d46b2d74253b7ac7da996bba9f0e8 100644 --- a/flink-addons/flink-streaming/pom.xml +++ b/flink-addons/flink-streaming/pom.xml @@ -78,11 +78,6 @@ log4j 1.2.16 - - com.rabbitmq - amqp-client - 3.3.1 - diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 05d0ca9632090b4afcde288372dcb09863b32ac3..5a6b80fcbbb6fe770210e27cd17bd37d0f13186d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -41,6 +41,7 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink; import eu.stratosphere.streaming.api.streamcomponent.StreamSource; import eu.stratosphere.streaming.api.streamcomponent.StreamTask; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.partitioner.BroadcastPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.partitioner.GlobalPartitioner; @@ -58,14 +59,14 @@ public class JobGraphBuilder { private Map> numberOfOutputChannels; private String maxParallelismVertexName; private int maxParallelism; - + private FaultToleranceType faultToleranceType; /** * Creates a new JobGraph with the given name * * @param jobGraphName * Name of the JobGraph */ - public JobGraphBuilder(String jobGraphName) { + public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) { jobGraph = new JobGraph(jobGraphName); components = new HashMap(); numberOfInstances = new HashMap(); @@ -73,6 +74,7 @@ public class JobGraphBuilder { maxParallelismVertexName = ""; maxParallelism = 0; log.debug("JobGraph created"); + this.faultToleranceType = faultToleranceType; } /** @@ -187,6 +189,9 @@ public class JobGraphBuilder { Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); + + config.setInteger("faultToleranceType", faultToleranceType.id); + components.put(componentName, component); numberOfInstances.put(componentName, parallelism); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 3c9c55c5e956a7e04df5324c9409d4707faed439..e981def9b979d27250a8bb54b1bad83518a00bb3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -34,10 +34,12 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.streaming.api.invokable.RecordInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.api.streamrecord.UID; import eu.stratosphere.streaming.faulttolerance.AckEvent; import eu.stratosphere.streaming.faulttolerance.AckEventListener; import eu.stratosphere.streaming.faulttolerance.FailEvent; import eu.stratosphere.streaming.faulttolerance.FailEventListener; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner; @@ -51,6 +53,37 @@ public final class StreamComponentHelper { return numComponents; } + public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type, + Configuration config, List> outputs, int taskInstanceID, + String name, int[] numberOfOutputChannels) { + type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0)); + + RecordInvoker invoker = getRecordInvoker(type); + switch (type) { + case AT_LEAST_ONCE: + case EXACTLY_ONCE: + util = new FaultToleranceUtil(type, outputs, taskInstanceID, name, + numberOfOutputChannels); + break; + case NONE: + default: + util = null; + break; + } + return invoker; + } + + public RecordInvoker getRecordInvoker(FaultToleranceType type) { + switch (type) { + case AT_LEAST_ONCE: + case EXACTLY_ONCE: + return new InvokerWithFaultTolerance(); + case NONE: + default: + return new Invoker(); + } + } + public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID, List> outputs) { @@ -75,22 +108,25 @@ public final class StreamComponentHelper { } - public void setConfigInputs(T taskBase, Configuration taskConfiguration, List> inputs) - throws StreamComponentException { + public void setConfigInputs(T taskBase, Configuration taskConfiguration, + List> inputs) throws StreamComponentException { int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); for (int i = 0; i < numberOfInputs; i++) { if (taskBase instanceof StreamTask) { - inputs.add(new StreamRecordReader((StreamTask) taskBase, StreamRecord.class)); + inputs.add(new StreamRecordReader((StreamTask) taskBase, + StreamRecord.class)); } else if (taskBase instanceof StreamSink) { - inputs.add(new StreamRecordReader((StreamSink) taskBase, StreamRecord.class)); + inputs.add(new StreamRecordReader((StreamSink) taskBase, + StreamRecord.class)); } else { throw new StreamComponentException("Nonsupported object passed to setConfigInputs"); } } } - public void setConfigOutputs(T taskBase, Configuration taskConfiguration, List> outputs, + public void setConfigOutputs(T taskBase, Configuration taskConfiguration, + List> outputs, List> partitioners) throws StreamComponentException { int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0); for (int i = 0; i < numberOfOutputs; i++) { @@ -98,10 +134,11 @@ public final class StreamComponentHelper { } for (ChannelSelector outputPartitioner : partitioners) { if (taskBase instanceof StreamTask) { - outputs.add(new RecordWriter((StreamTask) taskBase, StreamRecord.class, outputPartitioner)); + outputs.add(new RecordWriter((StreamTask) taskBase, + StreamRecord.class, outputPartitioner)); } else if (taskBase instanceof StreamSource) { - outputs.add(new RecordWriter((StreamSource) taskBase, StreamRecord.class, - outputPartitioner)); + outputs.add(new RecordWriter((StreamSource) taskBase, + StreamRecord.class, outputPartitioner)); } else { throw new StreamComponentException("Nonsupported object passed to setConfigOutputs"); } @@ -109,9 +146,8 @@ public final class StreamComponentHelper { } public UserSinkInvokable getUserFunction(Configuration taskConfiguration) { - - Class userFunctionClass = taskConfiguration.getClass("userfunction", - DefaultSinkInvokable.class, UserSinkInvokable.class); + Class userFunctionClass = taskConfiguration.getClass( + "userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class); UserSinkInvokable userFunction = null; try { @@ -122,21 +158,28 @@ public final class StreamComponentHelper { return userFunction; } + // TODO consider logging stack trace! public StreamInvokableComponent getUserFunction(Configuration taskConfiguration, - List> outputs, int instanceID, String name, FaultToleranceUtil recordBuffer) { + List> outputs, int instanceID, String name, + FaultToleranceUtil recordBuffer) { // Default value is a TaskInvokable even if it was called from a source - Class userFunctionClass = taskConfiguration.getClass("userfunction", - DefaultTaskInvokable.class, StreamInvokableComponent.class); + Class userFunctionClass = taskConfiguration.getClass( + "userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class); StreamInvokableComponent userFunction = null; + FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration + .getInteger("faultToleranceBuffer", 0)); try { userFunction = userFunctionClass.newInstance(); - userFunction.declareOutputs(outputs, instanceID, name, recordBuffer); + userFunction + .declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType); } catch (InstantiationException e) { log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName()); + // log.error(e.getStackTrace()); } catch (Exception e) { log.error("Cannot use user function: " + userFunctionClass.getSimpleName()); + // log.error(e.getStackTrace()); } return userFunction; } @@ -158,27 +201,29 @@ public final class StreamComponentHelper { private void setPartitioner(Configuration taskConfiguration, int nrOutput, List> partitioners) { - Class> partitioner = taskConfiguration.getClass("partitionerClass_" - + nrOutput, DefaultPartitioner.class, ChannelSelector.class); + Class> partitioner = taskConfiguration.getClass( + "partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class); try { if (partitioner.equals(FieldsPartitioner.class)) { - int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1); + int keyPosition = taskConfiguration + .getInteger("partitionerIntParam_" + nrOutput, 1); partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition)); } else { partitioners.add(partitioner.newInstance()); } - log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput + " outputs"); + log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput + + " outputs"); } catch (Exception e) { - log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with " + nrOutput - + " outputs", e); + log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with " + + nrOutput + " outputs", e); } } - public void invokeRecords(RecordInvokable userFunction, List> inputs, String name) - throws Exception { + public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction, + List> inputs, String name) throws Exception { List> closedInputs = new LinkedList>(); boolean hasInput = true; while (hasInput) { @@ -186,13 +231,8 @@ public final class StreamComponentHelper { for (StreamRecordReader input : inputs) { if (input.hasNext()) { hasInput = true; - StreamRecord record = input.next(); -// UID id = record.getId(); - userFunction.invoke(record); -// threadSafePublish(new AckEvent(id), input); -// log.debug("ACK: " + id + " -- " + name); - } - else if (input.isInputClosed()) { + invoker.call(name, userFunction, input); + } else if (input.isInputClosed()) { closedInputs.add(input); } } @@ -200,4 +240,30 @@ public final class StreamComponentHelper { } } + public static interface RecordInvoker { + void call(String name, RecordInvokable userFunction, StreamRecordReader input) + throws Exception; + } + + public class InvokerWithFaultTolerance implements RecordInvoker { + @Override + public void call(String name, RecordInvokable userFunction, + StreamRecordReader input) throws Exception { + StreamRecord record = input.next(); + UID id = record.getId(); + userFunction.invoke(record); + threadSafePublish(new AckEvent(id), input); + log.debug("ACK: " + id + " -- " + name); + } + } + + public static class Invoker implements RecordInvoker { + @Override + public void call(String name, RecordInvokable userFunction, + StreamRecordReader input) throws Exception { + StreamRecord record = input.next(); + userFunction.invoke(record); + } + } + } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java index 6f0ccc9029c3ed2e94fbdaa03096743858500119..714e9b3b151e17d28c3bb0f7c9debaa57d077dcb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokableComponent.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.util.PerformanceCounter; @@ -35,36 +36,47 @@ public abstract class StreamInvokableComponent { protected String name; private FaultToleranceUtil emittedRecords; protected PerformanceCounter performanceCounter; + private boolean useFaultTolerance; public final void declareOutputs(List> outputs, int channelID, - String name, FaultToleranceUtil emittedRecords) { + String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) { this.outputs = outputs; this.channelID = channelID; this.emittedRecords = emittedRecords; this.name = name; this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000, "/home/strato/stratosphere-distrib/log/counter/" + name + channelID); + this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE; } public final void emit(StreamRecord record) { record.setId(channelID); - // emittedRecords.addRecord(record); + + if (useFaultTolerance) { + emittedRecords.addRecord(record); + } + try { for (RecordWriter output : outputs) { output.emit(record); log.info("EMITTED: " + record.getId() + " -- " + name); } } catch (Exception e) { - emittedRecords.failRecord(record.getId()); + if (useFaultTolerance) { + emittedRecords.failRecord(record.getId()); + } + log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to " + e.getClass().getSimpleName()); } } - // TODO: Add fault tolerance + // TODO: Should we fail record at exception catch? public final void emit(StreamRecord record, int outputChannel) { record.setId(channelID); - // emittedRecords.addRecord(record, outputChannel); + if (useFaultTolerance) { + emittedRecords.addRecord(record, outputChannel); + } try { outputs.get(outputChannel).emit(record); } catch (Exception e) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java index 753105f1e296b362cd505ffb2ae3ba713f07eec8..47d045ea7a468d9d9cad54a227135f0ebca3b70e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java @@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFactory; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.template.AbstractOutputTask; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; public class StreamSink extends AbstractOutputTask { @@ -34,6 +36,7 @@ public class StreamSink extends AbstractOutputTask { private UserSinkInvokable userFunction; private StreamComponentHelper streamSinkHelper; private String name; + private RecordInvoker invoker; public StreamSink() { // TODO: Make configuration file visible and call setClassInputs() here @@ -52,13 +55,17 @@ public class StreamSink extends AbstractOutputTask { } catch (Exception e) { log.error("Cannot register inputs", e); } + + FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0)); + invoker = streamSinkHelper.getRecordInvoker(faultToleranceType); + userFunction = streamSinkHelper.getUserFunction(taskConfiguration); } @Override public void invoke() throws Exception { log.debug("SINK " + name + " invoked"); - streamSinkHelper.invokeRecords(userFunction, inputs, name); + streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name); System.out.println("Result: "+userFunction.getResult()); log.debug("SINK " + name + " invoke finished"); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java index d917948450da378e159a80339a8972e235c7d3e9..a3c9d7addfe524b38b881f9e54898992870e4921 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java @@ -26,8 +26,10 @@ import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.examples.DummyIS; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; public class StreamSource extends AbstractInputTask { @@ -41,6 +43,8 @@ public class StreamSource extends AbstractInputTask { private int sourceInstanceID; private String name; private FaultToleranceUtil recordBuffer; + private FaultToleranceType faultToleranceType; + private RecordInvoker invoker; StreamComponentHelper streamSourceHelper; public StreamSource() { @@ -49,7 +53,7 @@ public class StreamSource extends AbstractInputTask { partitioners = new LinkedList>(); userFunction = null; streamSourceHelper = new StreamComponentHelper(); - numSources=StreamComponentHelper.newComponent(); + numSources = StreamComponentHelper.newComponent(); sourceInstanceID = numSources; } @@ -69,20 +73,23 @@ public class StreamSource extends AbstractInputTask { name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME"); try { - streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, - partitioners); + streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners); } catch (StreamComponentException e) { log.error("Cannot register outputs", e); } - - int[] numberOfOutputChannels= new int[outputs.size()]; - for(int i=0; i { userFunction.invoke(); // TODO print to file System.out.println(userFunction.getResult()); - + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java index 919e2af8582cac456663fe23586b9e703100faf8..6ac4ed1cd896082a580afcae1e8740028e86ad96 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java @@ -26,7 +26,9 @@ import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; public class StreamTask extends AbstractTask { @@ -40,7 +42,9 @@ public class StreamTask extends AbstractTask { private static int numTasks; private int taskInstanceID; private String name; - StreamComponentHelper streamTaskHelper; + private StreamComponentHelper streamTaskHelper; + private FaultToleranceType faultToleranceType; + private RecordInvoker invoker; Configuration taskConfiguration; private FaultToleranceUtil recordBuffer; @@ -73,7 +77,9 @@ public class StreamTask extends AbstractTask { numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0); } - recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels); + invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType, + taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels); + userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration, outputs, taskInstanceID, name, recordBuffer); @@ -84,10 +90,10 @@ public class StreamTask extends AbstractTask { @Override public void invoke() throws Exception { log.debug("TASK " + name + " invoked with instance id " + taskInstanceID); - streamTaskHelper.invokeRecords(userFunction, inputs, name); + streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name); + // TODO print to file System.out.println(userFunction.getResult()); log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID); } - -} +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java index cb97c7a3c4f6b3d9e069d0dc0baf1b13ddd64cce..dee256693490b6d41910189e4354e9aedf09d892 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java @@ -28,6 +28,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class BasicTopology { @@ -69,7 +70,7 @@ public class BasicTopology { } private static JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE); graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1); graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1); graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/BatchForwardLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/BatchForwardLocal.java index bdede9b88797d63b0e26f75f8010154b5e26be5a..ecdeccdb753e89e8b1b2269de6feaf5d9a00bae6 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/BatchForwardLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/BatchForwardLocal.java @@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class BatchForwardLocal { public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("StreamSource", BatchForwardSource.class); graphBuilder.setSink("StreamSink", BatchForwardSink.class); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java index a4563d965219311631a08fa0816f1fcd09886d14..5b61f4674316f73589d3ddfae2c3ae35151f4ce1 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java @@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class BatchWordCountLocal { public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class); graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1); graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/cellinfo/CellInfoLocal.java index aae6ac506b36f6b9640d3d9c2d5bf0f9a9c9d104..1d1c17e3cae0152666d2c52f2a7b0f7089fe0500 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/cellinfo/CellInfoLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/cellinfo/CellInfoLocal.java @@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class CellInfoLocal { public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("infoSource", InfoSource.class); graphBuilder.setSource("querySource", QuerySource.class); graphBuilder.setTask("cellTask", CellTask.class, 3, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java index 2d16be13d912528ea9bef0ee2bde000b9f35be2c..e1b9f41d639ee3f48d6ed1cad69f4cad0b3b94c8 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -29,6 +29,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class IncrementalLearningSkeleton { @@ -144,7 +145,7 @@ public class IncrementalLearningSkeleton { } private static JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning", FaultToleranceType.NONE); graphBuilder.setSource("NewData", NewDataSource.class, 1, 1); graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java index 88f56789adc524fd7de99d404245e15e204e753a..440794172ad05ae884e237ff0a898360c4390419 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java @@ -33,6 +33,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class IncrementalOLS { @@ -46,7 +47,7 @@ public class IncrementalOLS { @Override public void invoke() throws Exception { record.initRecords(); - while(true) { + for (int j = 0; j < 100; j++) { // pull new record from data source record.setTuple(getNewData()); emit(record); @@ -63,7 +64,7 @@ public class IncrementalOLS { public static class TrainingDataSource extends UserSourceInvokable { - private final int BATCH_SIZE = 1000; + private final int BATCH_SIZE = 10; StreamRecord record = new StreamRecord(2, BATCH_SIZE); @@ -74,7 +75,7 @@ public class IncrementalOLS { record.initRecords(); - while(true) { + for (int j = 0; j < 1000; j++) { for (int i = 0; i < BATCH_SIZE; i++) { record.setTuple(i, getTrainingData()); } @@ -165,11 +166,12 @@ public class IncrementalOLS { @Override public void invoke(StreamRecord record) throws Exception { + System.out.println(record.getTuple()); } } private static JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS", FaultToleranceType.NONE); graphBuilder.setSource("NewData", NewDataSource.class, 1, 1); graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java index 9e73795ed097093d6be4bdf72e530aab0c50ccfb..202119a3bdb6969fe2412a9d42abefbfac477592 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java @@ -24,13 +24,14 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; //TODO: window operator remains unfinished. public class WindowWordCountLocal { public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class); graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1); graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java index 78a590c9e3c148b5691d23608fd9e526017773b4..6b5139899974ed84231a0b54c7fb4b344fc3b66d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java @@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class WordCountLocal { public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class); graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1); graphBuilder.setSink("WordCountSink", WordCountSink.class); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountRemote.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountRemote.java index 8245806d53e6692dd128d0189f9a97fcb4cfc063..e54ab0b978eb4421c5d99194f06c147e8b316753 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountRemote.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountRemote.java @@ -34,6 +34,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.PerformanceCounter; @@ -141,7 +142,7 @@ public class WordCountRemote { } private static JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1); graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1); graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java index 4002701dad3f0ee8526242094c28f6f2ec33671d..5234084ac49b3fc214ac1c03ef02899e597060d6 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java @@ -24,6 +24,7 @@ import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; public class WordCountStarter { @@ -31,7 +32,7 @@ public class WordCountStarter { private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance, int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks, int sinkSubtasksPerInstance) throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class, sourceSubtasks, sourceSubtasksPerInstance); graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks, diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceType.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceType.java new file mode 100644 index 0000000000000000000000000000000000000000..ef34c43cb25400f3aee7828bfc9f9b4fb01e5d05 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceType.java @@ -0,0 +1,24 @@ +package eu.stratosphere.streaming.faulttolerance; + +import java.util.HashMap; +import java.util.Map; + +public enum FaultToleranceType { + NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2); + + public final int id; + + FaultToleranceType(int id) { + this.id = id; + } + + private static final Map map = new HashMap(); + static { + for (FaultToleranceType type : FaultToleranceType.values()) + map.put(type.id, type); + } + + public static FaultToleranceType from(int id) { + return map.get(id); + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtil.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtil.java index 9e6d5b801cc1b6b1075e9fd772c5b75f5c2fbc5d..2bd3c1b32013c64f22565b05a1ce912657c103b1 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtil.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtil.java @@ -39,10 +39,9 @@ public class FaultToleranceUtil { private final int componentID; private int numberOfChannels; - - boolean exactlyOnce; - + private FaultToleranceBuffer buffer; + public FaultToleranceType type; public PerformanceTracker tracker; public PerformanceCounter counter; @@ -57,20 +56,22 @@ public class FaultToleranceUtil { * @param numberOfChannels * Number of output channels for the output components */ - // TODO:get faulttolerancy type from user config, update logs for channel + // TODO:update logs for channel // acks and fails - public FaultToleranceUtil(List> outputs, int sourceInstanceID, + public FaultToleranceUtil(FaultToleranceType type, List> outputs, int sourceInstanceID, int[] numberOfChannels) { this.outputs = outputs; this.componentID = sourceInstanceID; - - exactlyOnce = true; - - if (exactlyOnce) { + + this.type = type; + + switch (type) { + case EXACTLY_ONCE: this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); - } else { - this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); + break; + case AT_LEAST_ONCE: case NONE: default: + this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); } tracker = new PerformanceTracker("pc", 1000, 1000, 30000, @@ -80,18 +81,19 @@ public class FaultToleranceUtil { } - public FaultToleranceUtil(List> outputs, int sourceInstanceID, - String componentName, int[] numberOfChannels) { + public FaultToleranceUtil(FaultToleranceType type, List> outputs, + int sourceInstanceID, String componentName, int[] numberOfChannels) { this.outputs = outputs; - this.componentID = sourceInstanceID; - exactlyOnce = false; - - if (exactlyOnce) { - this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); - } else { + switch (type) { + case AT_LEAST_ONCE: + default: this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); + break; + case EXACTLY_ONCE: + this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID); + break; } tracker = new PerformanceTracker("pc", 1000, 1000, 10000, @@ -149,7 +151,7 @@ public class FaultToleranceUtil { */ public void failRecord(UID recordID, int channel) { // if by ft type - if (exactlyOnce) { + if (type == FaultToleranceType.EXACTLY_ONCE) { StreamRecord failed = buffer.failChannel(recordID, channel); if (failed != null) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/rabbitmq/RMQTopology.java deleted file mode 100755 index c894542dcf99826b7ed2cc081a0656967f8a1466..0000000000000000000000000000000000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/rabbitmq/RMQTopology.java +++ /dev/null @@ -1,92 +0,0 @@ -package eu.stratosphere.streaming.rabbitmq; - -import java.net.InetSocketAddress; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; - -import eu.stratosphere.api.java.tuple.Tuple1; -import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.client.program.Client; -import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.jobgraph.JobGraph; -import eu.stratosphere.streaming.api.JobGraphBuilder; -import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; -import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; -import eu.stratosphere.streaming.api.streamrecord.StreamRecord; - -public class RMQTopology { - - public static class RMQSource extends UserSourceInvokable { - StreamRecord record = new StreamRecord(new Tuple1()); - - @Override - public void invoke() throws Exception { - - String QUEUE_NAME = "hello"; - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); - Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); - - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - - QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicConsume(QUEUE_NAME, true, consumer); - - while (true) { - QueueingConsumer.Delivery delivery = consumer.nextDelivery(); - String message = new String(delivery.getBody()); - if(message.equals("quit")){ - break; - } - record.setString(0, message); - emit(record); - } - connection.close(); - } - - } - - public static class Sink extends UserSinkInvokable { - - @Override - public void invoke(StreamRecord record) throws Exception { - System.out.println(record); - } - } - - private static JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ"); - graphBuilder.setSource("Source", RMQSource.class, 1, 1); - graphBuilder.setSink("Sink", Sink.class, 1, 1); - - graphBuilder.shuffleConnect("Source", "Sink"); - - return graphBuilder.getJobGraph(); - } - - public static void main(String[] args) { - - try { - JobGraph jG = getJobGraph(); - Configuration configuration = jG.getJobConfiguration(); - - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); - - exec.start(); - - Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); - - client.run(jG, true); - - exec.stop(); - } catch (Exception e) { - - } - } - -} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java index 9660df7ed120c17f51633a2e0c33e9eb9d42906f..55a2adfde540c46c5bb6af062add8d27ee00b6cd 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentTest.java @@ -21,11 +21,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; - -import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.BeforeClass; import org.junit.Test; @@ -40,11 +36,12 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; +import eu.stratosphere.streaming.util.LogUtils; public class StreamComponentTest { private static Map data = new HashMap(); - private static boolean fPTest = true; public static class MySource extends UserSourceInvokable { public MySource() { @@ -54,7 +51,7 @@ public class StreamComponentTest { @Override public void invoke() throws Exception { - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 100; i++) { record.setField(0, i); emit(record); } @@ -92,12 +89,9 @@ public class StreamComponentTest { @BeforeClass public static void runStream() { - Logger root = Logger.getRootLogger(); - root.removeAllAppenders(); - root.addAppender(new ConsoleAppender()); - root.setLevel(Level.OFF); + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); graphBuilder.setSource("MySource", MySource.class); graphBuilder.setTask("MyTask", MyTask.class, 2, 2); graphBuilder.setSink("MySink", MySink.class); @@ -123,10 +117,7 @@ public class StreamComponentTest { @Test public void test() { - - Assert.assertTrue(fPTest); - - assertEquals(1000, data.keySet().size()); + assertEquals(100, data.keySet().size()); for (Integer k : data.keySet()) { assertEquals((Integer) (k + 1), data.get(k)); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceTypeTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceTypeTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dfbede3e5edd4b6960cfe9f76477675618582a50 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceTypeTest.java @@ -0,0 +1,15 @@ +package eu.stratosphere.streaming.faulttolerance; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class FaultToleranceTypeTest { + + @Test + public void test() { + assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0)); + assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1)); + assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2)); + } +} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtilTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtilTest.java index 0cb8d1a2fc1848140f7ad774214aa796850c2c67..daa725f4e79387d8375bfb90c0b365ce0945a881 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtilTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/faulttolerance/FaultToleranceUtilTest.java @@ -33,7 +33,7 @@ public class FaultToleranceUtilTest { public void setFaultTolerancyBuffer() { outputs = new LinkedList>(); int[] numOfOutputchannels = { 1, 2 }; - faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels); + faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels); } @Test