提交 c2aff839 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] JavaDoc and api update

上级 03b0f8b7
......@@ -44,6 +44,7 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......@@ -61,6 +62,7 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
......@@ -68,7 +70,7 @@ public class JobGraphBuilder {
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName) {
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
......@@ -76,10 +78,11 @@ public class JobGraphBuilder {
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
/**
* Adds a source component to the JobGraph
* Adds a source component to the JobGraph with no parallelism
*
* @param sourceName
* Name of the source component
......@@ -112,6 +115,33 @@ public class JobGraphBuilder {
log.debug("SOURCE: " + sourceName);
}
/**
* Adds source to the JobGraph by user defined object with no parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject) {
setSource(sourceName, InvokableObject, 1, 1);
}
/**
* Adds source to the JobGraph by user defined object with the set
* parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
......@@ -121,7 +151,7 @@ public class JobGraphBuilder {
}
/**
* Adds a task component to the JobGraph
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
......@@ -152,6 +182,30 @@ public class JobGraphBuilder {
log.debug("TASK: " + taskName);
}
/**
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject) {
setTask(taskName, TaskInvokableObject, 1, 1);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
......@@ -161,7 +215,7 @@ public class JobGraphBuilder {
}
/**
* Adds a sink component to the JobGraph
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
......@@ -192,6 +246,30 @@ public class JobGraphBuilder {
log.debug("SINK: " + sinkName);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject) {
setSink(sinkName, InvokableObject, 1, 1);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
......@@ -200,6 +278,21 @@ public class JobGraphBuilder {
log.debug("SINK: " + sinkName);
}
/**
* Sets JobVertex configuration based on the given parameters
*
* @param componentName
* Name of the component
* @param InvokableClass
* Class of the user defined Invokable
* @param parallelism
* Number of subtasks
* @param subtasksPerInstance
* Number of subtasks per instance
* @param component
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
......@@ -214,6 +307,9 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
......@@ -242,6 +338,14 @@ public class JobGraphBuilder {
addSerializedObject(InvokableObject, component);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
* @param InvokableObject
* Invokable object to serialize
* @param component
* JobVertex to which the serialized invokable will be added
*/
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
......@@ -298,6 +402,13 @@ public class JobGraphBuilder {
}
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
......@@ -305,6 +416,9 @@ public class JobGraphBuilder {
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
public void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
......@@ -421,6 +535,9 @@ public class JobGraphBuilder {
}
}
/**
* Writes number of inputs into each JobVertex's config
*/
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
......@@ -428,6 +545,10 @@ public class JobGraphBuilder {
}
}
/**
* Writes the number of outputs and output channels into each JobVertex's
* config
*/
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs",
......@@ -444,7 +565,9 @@ public class JobGraphBuilder {
}
/**
* @return The JobGraph object
* Returns the JobGraph
*
* @return JobGraph object
*/
public JobGraph getJobGraph() {
setAutomaticInstanceSharing();
......
......@@ -36,10 +36,12 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......@@ -53,6 +55,37 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return numComponents;
}
public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type,
Configuration config, List<RecordWriter<StreamRecord>> outputs, int taskInstanceID,
String name, int[] numberOfOutputChannels) {
type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0));
RecordInvoker invoker = getRecordInvoker(type);
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
util = new FaultToleranceUtil(type, outputs, taskInstanceID, name,
numberOfOutputChannels);
break;
case NONE:
default:
util = null;
break;
}
return invoker;
}
public RecordInvoker getRecordInvoker(FaultToleranceType type) {
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
return new InvokerWithFaultTolerance();
case NONE:
default:
return new Invoker();
}
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
......@@ -142,6 +175,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
// TODO consider logging stack trace!
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name,
FaultToleranceUtil recordBuffer) {
......@@ -150,6 +184,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceBuffer", 0));
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
......@@ -158,7 +194,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamInvokableComponent) ois.readObject();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
......@@ -166,12 +202,13 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
......@@ -214,7 +251,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void invokeRecords(RecordInvokable userFunction,
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
......@@ -223,11 +261,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
StreamRecord record = input.next();
// UID id = record.getId();
userFunction.invoke(record);
// threadSafePublish(new AckEvent(id), input);
// log.debug("ACK: " + id + " -- " + name);
invoker.call(name, userFunction, input);
} else if (input.isInputClosed()) {
closedInputs.add(input);
}
......@@ -236,4 +270,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -38,20 +39,26 @@ public abstract class StreamInvokableComponent implements Serializable {
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords) {
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void emit(StreamRecord record) {
record.setId(channelID);
// emittedRecords.addRecord(record);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
......@@ -59,16 +66,21 @@ public abstract class StreamInvokableComponent implements Serializable {
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
emittedRecords.failRecord(record.getId());
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
// TODO: Add fault tolerance
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
// emittedRecords.addRecord(record, outputChannel);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
......
......@@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
......@@ -34,6 +36,7 @@ public class StreamSink extends AbstractOutputTask {
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
private RecordInvoker invoker;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -52,13 +55,17 @@ public class StreamSink extends AbstractOutputTask {
} catch (Exception e) {
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(userFunction, inputs, name);
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
......
......@@ -26,8 +26,10 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
......@@ -41,6 +43,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
private int sourceInstanceID;
private String name;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
......@@ -79,8 +83,11 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID, name,
numberOfOutputChannels);
// recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name,
// numberOfOutputChannels);
invoker = streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
......
......@@ -26,7 +26,9 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
......@@ -40,7 +42,9 @@ public class StreamTask extends AbstractTask {
private static int numTasks;
private int taskInstanceID;
private String name;
StreamComponentHelper<StreamTask> streamTaskHelper;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
......@@ -73,7 +77,9 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
......@@ -84,10 +90,10 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(userFunction, inputs, name);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology {
......@@ -69,7 +70,7 @@ public class BasicTopology {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE);
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchForwardLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class CellInfoLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
......
......@@ -29,6 +29,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
......@@ -144,7 +145,7 @@ public class IncrementalLearningSkeleton {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning", FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -33,6 +33,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
......@@ -169,7 +170,7 @@ public class IncrementalOLS {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS");
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS", FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -24,13 +24,14 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
......
......@@ -15,22 +15,20 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,1,1);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......@@ -44,39 +42,16 @@ public class WordCountLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
if (args.length == 0) {
args = new String[] { "local" };
}
client.run(jG, true);
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
}
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
} catch (Exception e) {
System.out.println(e);
}
}
......
......@@ -34,6 +34,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -141,7 +142,7 @@ public class WordCountRemote {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
......
......@@ -24,6 +24,7 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
......@@ -31,7 +32,7 @@ public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.Map;
public enum FaultToleranceType {
NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2);
public final int id;
FaultToleranceType(int id) {
this.id = id;
}
private static final Map<Integer, FaultToleranceType> map = new HashMap<Integer, FaultToleranceType>();
static {
for (FaultToleranceType type : FaultToleranceType.values())
map.put(type.id, type);
}
public static FaultToleranceType from(int id) {
return map.get(id);
}
}
\ No newline at end of file
......@@ -39,10 +39,9 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
public PerformanceCounter counter;
......@@ -57,20 +56,22 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
// TODO:get faulttolerancy type from user config, update logs for channel
// TODO:update logs for channel
// acks and fails
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = true;
if (exactlyOnce) {
this.type = type;
switch (type) {
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -80,18 +81,19 @@ public class FaultToleranceUtil {
}
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
switch (type) {
case AT_LEAST_ONCE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
......@@ -149,7 +151,7 @@ public class FaultToleranceUtil {
*/
public void failRecord(UID recordID, int channel) {
// if by ft type
if (exactlyOnce) {
if (type == FaultToleranceType.EXACTLY_ONCE) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
......
package eu.stratosphere.streaming.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class RMQSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
StreamRecord record = new StreamRecord(new Tuple1<String>());
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
@Override
public void invoke() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
if (message.equals("quit")) {
break;
}
record.setString(0, message);
emit(record);
}
connection.close();
}
}
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.rabbitmq;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
......@@ -23,14 +23,14 @@ import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class RMQTopology {
......@@ -84,8 +84,9 @@ public class RMQTopology {
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ");
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
......@@ -96,23 +97,8 @@ public class RMQTopology {
public static void main(String[] args) {
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
client.run(jG, true);
exec.stop();
} catch (Exception e) {
}
}
}
}
\ No newline at end of file
package eu.stratosphere.streaming.util;
import java.net.InetSocketAddress;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.ProgramInvocationException;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
public class ClusterUtil {
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
*
* @param jobGraph
*/
public static void runOnMiniCluster(JobGraph jobGraph) {
System.out.println("Running on mini cluster");
Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
try {
exec.start();
client.run(jobGraph, true);
exec.stop();
} catch (Exception e) {
}
}
public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
System.out.println("Running on local cluster");
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(IP, port), configuration);
try {
client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
}
}
}
......@@ -21,8 +21,6 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.log4j.Level;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -38,12 +36,13 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
private static boolean fPTest = true;
public static class MySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
......@@ -59,7 +58,7 @@ public class StreamComponentTest {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 100; i++) {
record.setField(0, i);
emit(record);
}
......@@ -100,7 +99,7 @@ public class StreamComponentTest {
public static class MySink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
String out;
public MySink(String out) {
......@@ -124,35 +123,20 @@ public class StreamComponentTest {
public static void runStream() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", new MySource("source"), 1, 1);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", new MySource("source"));
graphBuilder.setTask("MyTask", new MyTask("task"), 2, 2);
graphBuilder.setSink("MySink", new MySink("sink"), 1, 1);
graphBuilder.setSink("MySink", new MySink("sink"));
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MyTask", "MySink");
JobGraph jG = graphBuilder.getJobGraph();
Configuration configuration = jG.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} catch (Exception e) {
}
ClusterUtil.runOnMiniCluster(graphBuilder.getJobGraph());
}
@Test
public void test() {
Assert.assertTrue(fPTest);
assertEquals(1000, data.keySet().size());
assertEquals(100, data.keySet().size());
for (Integer k : data.keySet()) {
assertEquals((Integer) (k + 1), data.get(k));
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*;
import org.junit.Test;
public class FaultToleranceTypeTest {
@Test
public void test() {
assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0));
assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1));
assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2));
}
}
......@@ -33,7 +33,7 @@ public class FaultToleranceUtilTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
int[] numOfOutputchannels = { 1, 2 };
faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels);
faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册