提交 03b0f8b7 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] added support for udf objects in jobgraphbuilder

上级 ce2d67fd
......@@ -15,6 +15,9 @@
package eu.stratosphere.streaming.api;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -41,7 +44,6 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......@@ -59,14 +61,14 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
*
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
......@@ -74,7 +76,6 @@ public class JobGraphBuilder {
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
/**
......@@ -111,6 +112,14 @@ public class JobGraphBuilder {
log.debug("SOURCE: " + sourceName);
}
public void setSource(String sourceName, UserSourceInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableObject, parallelism, subtasksPerInstance, source);
log.debug("SOURCE: " + sourceName);
}
/**
* Adds a task component to the JobGraph
*
......@@ -143,6 +152,14 @@ public class JobGraphBuilder {
log.debug("TASK: " + taskName);
}
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
log.debug("TASK: " + taskName);
}
/**
* Adds a sink component to the JobGraph
*
......@@ -175,6 +192,14 @@ public class JobGraphBuilder {
log.debug("SINK: " + sinkName);
}
public void setSink(String sinkName, UserSinkInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableObject, parallelism, subtasksPerInstance, sink);
log.debug("SINK: " + sinkName);
}
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
......@@ -189,13 +214,55 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
private void setComponent(String componentName, UserSourceInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserTaskInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserSinkInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(InvokableObject);
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass());
}
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
......
......@@ -17,6 +17,6 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public interface RecordInvokable extends UserInvokable {
public interface RecordInvokable extends UserInvokable {
public void invoke(StreamRecord record) throws Exception;
}
......@@ -15,8 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
public abstract class UserSinkInvokable implements RecordInvokable {
// TODO consider a common interface with StreamInvokableComponents
import java.io.Serializable;
public abstract class UserSinkInvokable implements RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
public String getResult() {
return "Override getResult() to pass your own results";
}
......
......@@ -15,7 +15,13 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable {
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable,
Serializable {
private static final long serialVersionUID = 1L;
}
......@@ -15,7 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserTaskInvokable extends StreamInvokableComponent implements RecordInvokable {
public abstract class UserTaskInvokable extends StreamInvokableComponent implements
RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
......@@ -15,7 +15,9 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.List;
......@@ -34,12 +36,10 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......@@ -53,37 +53,6 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return numComponents;
}
public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type,
Configuration config, List<RecordWriter<StreamRecord>> outputs, int taskInstanceID,
String name, int[] numberOfOutputChannels) {
type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0));
RecordInvoker invoker = getRecordInvoker(type);
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
util = new FaultToleranceUtil(type, outputs, taskInstanceID, name,
numberOfOutputChannels);
break;
case NONE:
default:
util = null;
break;
}
return invoker;
}
public RecordInvoker getRecordInvoker(FaultToleranceType type) {
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
return new InvokerWithFaultTolerance();
case NONE:
default:
return new Invoker();
}
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
......@@ -146,19 +115,33 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
// TODO consider logging stack trace!
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name,
FaultToleranceUtil recordBuffer) {
......@@ -167,19 +150,28 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceBuffer", 0));
try {
userFunction = userFunctionClass.newInstance();
userFunction
.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
// log.error(e.getStackTrace());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
// log.error(e.getStackTrace());
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamInvokableComponent) ois.readObject();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
......@@ -222,7 +214,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
public void invokeRecords(RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
......@@ -231,7 +223,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
invoker.call(name, userFunction, input);
StreamRecord record = input.next();
// UID id = record.getId();
userFunction.invoke(record);
// threadSafePublish(new AckEvent(id), input);
// log.debug("ACK: " + id + " -- " + name);
} else if (input.isInputClosed()) {
closedInputs.add(input);
}
......@@ -240,30 +236,4 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
......@@ -22,11 +23,12 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent {
public abstract class StreamInvokableComponent implements Serializable {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
......@@ -36,47 +38,37 @@ public abstract class StreamInvokableComponent {
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
String name, FaultToleranceUtil emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void emit(StreamRecord record) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
// emittedRecords.addRecord(record);
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
emittedRecords.failRecord(record.getId());
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
// TODO: Should we fail record at exception catch?
// TODO: Add fault tolerance
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
// emittedRecords.addRecord(record, outputChannel);
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
......
......@@ -24,9 +24,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
......@@ -36,7 +34,6 @@ public class StreamSink extends AbstractOutputTask {
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
private RecordInvoker invoker;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -55,17 +52,13 @@ public class StreamSink extends AbstractOutputTask {
} catch (Exception e) {
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
streamSinkHelper.invokeRecords(userFunction, inputs, name);
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
......
......@@ -26,10 +26,8 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
......@@ -43,8 +41,6 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
private int sourceInstanceID;
private String name;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
......@@ -83,11 +79,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
// recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name,
// numberOfOutputChannels);
invoker = streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID, name,
numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
......
......@@ -26,9 +26,7 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
......@@ -42,9 +40,7 @@ public class StreamTask extends AbstractTask {
private static int numTasks;
private int taskInstanceID;
private String name;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamTask> streamTaskHelper;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
......@@ -77,9 +73,7 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels);
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
......@@ -90,10 +84,10 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
streamTaskHelper.invokeRecords(userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
\ No newline at end of file
}
......@@ -17,6 +17,7 @@ package eu.stratosphere.streaming.api.streamrecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
......@@ -65,6 +66,18 @@ public class UID implements IOReadableWritable, Serializable {
public void write(DataOutput out) throws IOException {
out.write(uid.array());
}
private void writeObject(ObjectOutputStream stream)
throws IOException {
stream.write(uid.array());
}
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
byte[] uidA = new byte[20];
stream.read(uidA);
uid = ByteBuffer.allocate(20).put(uidA);
}
@Override
public void read(DataInput in) throws IOException {
......
......@@ -28,7 +28,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology {
......@@ -70,7 +69,7 @@ public class BasicTopology {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchForwardLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class CellInfoLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
......
......@@ -29,7 +29,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
......@@ -145,7 +144,7 @@ public class IncrementalLearningSkeleton {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -33,7 +33,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
......@@ -170,7 +169,7 @@ public class IncrementalOLS {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......
......@@ -24,14 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
......
......@@ -24,14 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,1,1);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......
......@@ -34,7 +34,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -142,7 +141,7 @@ public class WordCountRemote {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
......
......@@ -24,7 +24,6 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
......@@ -32,7 +31,7 @@ public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
......
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.Map;
public enum FaultToleranceType {
NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2);
public final int id;
FaultToleranceType(int id) {
this.id = id;
}
private static final Map<Integer, FaultToleranceType> map = new HashMap<Integer, FaultToleranceType>();
static {
for (FaultToleranceType type : FaultToleranceType.values())
map.put(type.id, type);
}
public static FaultToleranceType from(int id) {
return map.get(id);
}
}
\ No newline at end of file
......@@ -39,9 +39,10 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
public PerformanceCounter counter;
......@@ -56,22 +57,20 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
// TODO:update logs for channel
// TODO:get faulttolerancy type from user config, update logs for channel
// acks and fails
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
this.type = type;
switch (type) {
case EXACTLY_ONCE:
exactlyOnce = true;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -81,19 +80,18 @@ public class FaultToleranceUtil {
}
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, String componentName, int[] numberOfChannels) {
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
switch (type) {
case AT_LEAST_ONCE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case EXACTLY_ONCE:
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
......@@ -151,7 +149,7 @@ public class FaultToleranceUtil {
*/
public void failRecord(UID recordID, int channel) {
// if by ft type
if (type == FaultToleranceType.EXACTLY_ONCE) {
if (exactlyOnce) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import java.net.InetSocketAddress;
......@@ -16,52 +31,62 @@ import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class RMQTopology {
public static class RMQSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
StreamRecord record = new StreamRecord(new Tuple1<String>());
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
@Override
public void invoke() throws Exception {
String QUEUE_NAME = "hello";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
if(message.equals("quit")){
break;
}
record.setString(0, message);
emit(record);
}
connection.close();
factory.setHost(HOST_NAME);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
if (message.equals("quit")) {
break;
}
record.setString(0, message);
emit(record);
}
connection.close();
}
}
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record);
System.out.println(record.getString(0));
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", RMQSource.class, 1, 1);
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ");
graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
......
......@@ -21,6 +21,8 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.log4j.Level;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -36,30 +38,50 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
private static boolean fPTest = true;
public static class MySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
String out;
public MySource() {
}
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
public MySource(String string) {
out = string;
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 1000; i++) {
record.setField(0, i);
emit(record);
}
}
@Override
public String getResult() {
return out;
}
}
public static class MyTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
String out;
public MyTask() {
}
public MyTask(String string) {
out = string;
}
@Override
......@@ -68,10 +90,21 @@ public class StreamComponentTest {
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(i, i + 1)));
}
@Override
public String getResult() {
return out;
}
}
public static class MySink extends UserSinkInvokable {
public MySink() {
private static final long serialVersionUID = 1L;
String out;
public MySink(String out) {
this.out = out;
}
@Override
......@@ -83,18 +116,18 @@ public class StreamComponentTest {
@Override
public String getResult() {
return "";
return out;
}
}
@BeforeClass
public static void runStream() {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2, 2);
graphBuilder.setSink("MySink", MySink.class);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", new MySource("source"), 1, 1);
graphBuilder.setTask("MyTask", new MyTask("task"), 2, 2);
graphBuilder.setSink("MySink", new MySink("sink"), 1, 1);
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MyTask", "MySink");
......@@ -111,13 +144,15 @@ public class StreamComponentTest {
exec.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void test() {
assertEquals(100, data.keySet().size());
Assert.assertTrue(fPTest);
assertEquals(1000, data.keySet().size());
for (Integer k : data.keySet()) {
assertEquals((Integer) (k + 1), data.get(k));
......
package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*;
import org.junit.Test;
public class FaultToleranceTypeTest {
@Test
public void test() {
assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0));
assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1));
assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2));
}
}
......@@ -33,7 +33,7 @@ public class FaultToleranceUtilTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
int[] numOfOutputchannels = { 1, 2 };
faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels);
faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册