提交 716aaa87 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] add clear function for StreamRecord, add framework of checkpointer.

上级 db00b861
......@@ -78,11 +78,6 @@
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
......
......@@ -15,9 +15,6 @@
package eu.stratosphere.streaming.api;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -44,7 +41,6 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......@@ -62,7 +58,6 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
......@@ -70,7 +65,7 @@ public class JobGraphBuilder {
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
......@@ -78,11 +73,10 @@ public class JobGraphBuilder {
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
/**
* Adds a source component to the JobGraph with no parallelism
* Adds a source component to the JobGraph
*
* @param sourceName
* Name of the source component
......@@ -116,42 +110,7 @@ public class JobGraphBuilder {
}
/**
* Adds source to the JobGraph by user defined object with no parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject) {
setSource(sourceName, InvokableObject, 1, 1);
}
/**
* Adds source to the JobGraph by user defined object with the set
* parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableObject, parallelism, subtasksPerInstance, source);
log.debug("SOURCE: " + sourceName);
}
/**
* Adds a task component to the JobGraph with no parallelism
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
......@@ -183,39 +142,7 @@ public class JobGraphBuilder {
}
/**
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject) {
setTask(taskName, TaskInvokableObject, 1, 1);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
log.debug("TASK: " + taskName);
}
/**
* Adds a sink component to the JobGraph with no parallelism
* Adds a sink component to the JobGraph
*
* @param sinkName
* Name of the sink component
......@@ -246,53 +173,6 @@ public class JobGraphBuilder {
log.debug("SINK: " + sinkName);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject) {
setSink(sinkName, InvokableObject, 1, 1);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableObject, parallelism, subtasksPerInstance, sink);
log.debug("SINK: " + sinkName);
}
/**
* Sets JobVertex configuration based on the given parameters
*
* @param componentName
* Name of the component
* @param InvokableClass
* Class of the user defined Invokable
* @param parallelism
* Number of subtasks
* @param subtasksPerInstance
* Number of subtasks per instance
* @param component
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
......@@ -307,66 +187,10 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
private void setComponent(String componentName, UserSourceInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserTaskInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserSinkInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
* @param InvokableObject
* Invokable object to serialize
* @param component
* JobVertex to which the serialized invokable will be added
*/
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(InvokableObject);
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass());
}
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
......@@ -402,13 +226,6 @@ public class JobGraphBuilder {
}
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
......@@ -416,9 +233,6 @@ public class JobGraphBuilder {
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
public void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
......@@ -535,9 +349,6 @@ public class JobGraphBuilder {
}
}
/**
* Writes number of inputs into each JobVertex's config
*/
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
......@@ -545,10 +356,6 @@ public class JobGraphBuilder {
}
}
/**
* Writes the number of outputs and output channels into each JobVertex's
* config
*/
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs",
......@@ -565,9 +372,7 @@ public class JobGraphBuilder {
}
/**
* Returns the JobGraph
*
* @return JobGraph object
* @return The JobGraph object
*/
public JobGraph getJobGraph() {
setAutomaticInstanceSharing();
......
......@@ -19,10 +19,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
@Override
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0, 0);
System.out.println(value.getValue());
......
......@@ -20,16 +20,14 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultSourceInvokable extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" ");
private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" ");
@Override
public void invoke() throws Exception {
for (String word : mottoArray) {
emit(new StreamRecord(new Tuple1<String>(word)));
}
}
@Override
public void invoke() throws Exception {
for (String word : mottoArray) {
emit( new StreamRecord(new Tuple1<String>(word)));
}
}
}
......@@ -18,9 +18,6 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultTaskInvokable extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
emit(record);
......
......@@ -17,6 +17,6 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public interface RecordInvokable extends UserInvokable {
public interface RecordInvokable extends UserInvokable {
public void invoke(StreamRecord record) throws Exception;
}
......@@ -15,12 +15,8 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
public abstract class UserSinkInvokable implements RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
public abstract class UserSinkInvokable implements RecordInvokable {
// TODO consider a common interface with StreamInvokableComponents
public String getResult() {
return "Override getResult() to pass your own results";
}
......
......@@ -15,13 +15,7 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable,
Serializable {
private static final long serialVersionUID = 1L;
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable {
}
......@@ -15,12 +15,7 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserTaskInvokable extends StreamInvokableComponent implements
RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
public abstract class UserTaskInvokable extends StreamInvokableComponent implements RecordInvokable {
}
\ No newline at end of file
......@@ -15,9 +15,7 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.List;
......@@ -36,12 +34,10 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......@@ -55,37 +51,6 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return numComponents;
}
public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type,
Configuration config, List<RecordWriter<StreamRecord>> outputs, int taskInstanceID,
String name, int[] numberOfOutputChannels) {
type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0));
RecordInvoker invoker = getRecordInvoker(type);
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
util = new FaultToleranceUtil(type, outputs, taskInstanceID, name,
numberOfOutputChannels);
break;
case NONE:
default:
util = null;
break;
}
return invoker;
}
public RecordInvoker getRecordInvoker(FaultToleranceType type) {
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
return new InvokerWithFaultTolerance();
case NONE:
default:
return new Invoker();
}
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
......@@ -110,25 +75,22 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<StreamRecordReader<StreamRecord>> inputs) throws StreamComponentException {
public void setConfigInputs(T taskBase, Configuration taskConfiguration, List<StreamRecordReader<StreamRecord>> inputs)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase, StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase, StreamRecord.class));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
}
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
public void setConfigOutputs(T taskBase, Configuration taskConfiguration, List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
......@@ -136,11 +98,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase, StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase, StreamRecord.class,
outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
......@@ -149,66 +110,33 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
return userFunction;
}
// TODO consider logging stack trace!
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name,
FaultToleranceUtil recordBuffer) {
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name, FaultToleranceUtil recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceBuffer", 0));
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamInvokableComponent) ois.readObject();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
return userFunction;
}
......@@ -230,30 +158,27 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass("partitionerClass_"
+ nrOutput, DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1);
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
partitioners.add(partitioner.newInstance());
}
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput + " outputs");
} catch (Exception e) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs", e);
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs", e);
}
}
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
public void invokeRecords(RecordInvokable userFunction, List<StreamRecordReader<StreamRecord>> inputs, String name)
throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
while (hasInput) {
......@@ -261,8 +186,13 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
invoker.call(name, userFunction, input);
} else if (input.isInputClosed()) {
StreamRecord record = input.next();
// UID id = record.getId();
userFunction.invoke(record);
// threadSafePublish(new AckEvent(id), input);
// log.debug("ACK: " + id + " -- " + name);
}
else if (input.isInputClosed()) {
closedInputs.add(input);
}
}
......@@ -270,30 +200,4 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
}
\ No newline at end of file
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
......@@ -23,13 +22,10 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent implements Serializable {
private static final long serialVersionUID = 1L;
public abstract class StreamInvokableComponent {
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
......@@ -39,48 +35,36 @@ public abstract class StreamInvokableComponent implements Serializable {
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
String name, FaultToleranceUtil emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void emit(StreamRecord record) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
// emittedRecords.addRecord(record);
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
emittedRecords.failRecord(record.getId());
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
// TODO: Should we fail record at exception catch?
// TODO: Add fault tolerance
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
// emittedRecords.addRecord(record, outputChannel);
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
......
......@@ -24,9 +24,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
......@@ -36,7 +34,6 @@ public class StreamSink extends AbstractOutputTask {
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
private RecordInvoker invoker;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -55,17 +52,13 @@ public class StreamSink extends AbstractOutputTask {
} catch (Exception e) {
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
streamSinkHelper.invokeRecords(userFunction, inputs, name);
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
......
......@@ -26,10 +26,8 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
......@@ -43,8 +41,6 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
private int sourceInstanceID;
private String name;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
......@@ -53,7 +49,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
streamSourceHelper = new StreamComponentHelper<StreamSource>();
numSources = StreamComponentHelper.newComponent();
numSources=StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
}
......@@ -73,23 +69,20 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
} catch (StreamComponentException e) {
log.error("Cannot register outputs", e);
}
int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
int[] numberOfOutputChannels= new int[outputs.size()];
for(int i=0; i<numberOfOutputChannels.length;i++ ){
numberOfOutputChannels[i]=taskConfiguration.getInteger("channels_"+i, 0);
}
// recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name,
// numberOfOutputChannels);
invoker = streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
}
......@@ -100,7 +93,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
userFunction.invoke();
// TODO print to file
System.out.println(userFunction.getResult());
}
}
......@@ -26,9 +26,7 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
......@@ -42,9 +40,7 @@ public class StreamTask extends AbstractTask {
private static int numTasks;
private int taskInstanceID;
private String name;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
StreamComponentHelper<StreamTask> streamTaskHelper;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
......@@ -77,9 +73,7 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels);
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
......@@ -90,10 +84,10 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
streamTaskHelper.invokeRecords(userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
\ No newline at end of file
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class StreamWindowTask extends UserTaskInvokable {
private int computeGranularity;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
public StreamWindowTask(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record){}
private void decrementCompute(StreamRecord record){}
private void produceRecord(long progress){}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress > nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
}
......@@ -86,6 +86,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates empty StreamRecord with number of fields set
*
......@@ -95,7 +105,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.batchSize = 1;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
/**
......@@ -113,16 +124,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates a new batch of records containing only the given Tuple as element
* and sets desired batch size.
......@@ -147,7 +148,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tupleList
* Tuples to bes stored in the StreamRecord
*/
public StreamRecord(List<Tuple> tupleList) {
numOfFields = tupleList.get(0).getArity();
numOfTuples = tupleList.size();
......@@ -166,6 +166,67 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this(tuple, 1);
}
/**
* Remove all the contents inside StreamRecord.
*/
public void Clear(){
this.numOfTuples = 0;
tupleBatch.clear();
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
}
......@@ -291,15 +352,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// TODO: add exception for cast for all getters
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Boolean) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Boolean) getField(tupleNumber, fieldNumber);
}
/**
* Get a Byte from thne given field of the first Tuple of the batch
* Get a Byte from the given field of the first Tuple of the batch
*
* @param fieldNumber
* Position of the field in the tuple
......@@ -324,11 +381,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Byte) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Byte) getField(tupleNumber, fieldNumber);
}
/**
......@@ -358,11 +411,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Character) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Character) getField(tupleNumber, fieldNumber);
}
/**
......@@ -391,11 +440,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Double) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Double) getField(tupleNumber, fieldNumber);
}
/**
......@@ -424,11 +469,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Float) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Float) getField(tupleNumber, fieldNumber);
}
/**
......@@ -457,11 +498,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Integer) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Integer) getField(tupleNumber, fieldNumber);
}
/**
......@@ -490,11 +527,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Long) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Long) getField(tupleNumber, fieldNumber);
}
/**
......@@ -523,11 +556,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (Short) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (Short) getField(tupleNumber, fieldNumber);
}
/**
......@@ -554,11 +583,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
try {
return (String) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
return (String) getField(tupleNumber, fieldNumber);
}
/**
......@@ -587,7 +612,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
// TODO: consider interaction with batch size
// TODO: consider no such tuple exception and interaction with batch size
public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
try {
tupleBatch.get(tupleNumber).setField(o, fieldNumber);
......@@ -977,57 +1002,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
......
......@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamrecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
......@@ -66,18 +65,6 @@ public class UID implements IOReadableWritable, Serializable {
public void write(DataOutput out) throws IOException {
out.write(uid.array());
}
private void writeObject(ObjectOutputStream stream)
throws IOException {
stream.write(uid.array());
}
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
byte[] uidA = new byte[20];
stream.read(uidA);
uid = ByteBuffer.allocate(20).put(uidA);
}
@Override
public void read(DataInput in) throws IOException {
......
......@@ -28,7 +28,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology {
......@@ -70,7 +69,7 @@ public class BasicTopology {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchForwardLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
......
......@@ -24,13 +24,12 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class CellInfoLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
......
......@@ -14,18 +14,21 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
......@@ -33,8 +36,6 @@ public class IncrementalLearningSkeleton {
// Source for feeding new data for prediction
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>(1));
@Override
......@@ -56,8 +57,6 @@ public class IncrementalLearningSkeleton {
// Source for feeding new training data for partial model building
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// Number of tuples grouped for building partial model
private final int BATCH_SIZE = 1000;
......@@ -89,8 +88,6 @@ public class IncrementalLearningSkeleton {
// Task for building up-to-date partial models on new training data
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
......@@ -107,8 +104,6 @@ public class IncrementalLearningSkeleton {
// batch-processing and the up-to-date partial model
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
StreamRecord batchModel = null;
StreamRecord partialModel = null;
......@@ -142,17 +137,14 @@ public class IncrementalLearningSkeleton {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// do nothing
}
}
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning",
FaultToleranceType.NONE);
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......@@ -173,16 +165,32 @@ public class IncrementalLearningSkeleton {
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
\ No newline at end of file
......@@ -14,6 +14,7 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
import java.net.InetSocketAddress;
import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
......@@ -23,22 +24,21 @@ import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(2, 1);
Random rnd = new Random();
......@@ -46,7 +46,7 @@ public class IncrementalOLS {
@Override
public void invoke() throws Exception {
record.initRecords();
while (true) {
for (int j = 0; j < 100; j++) {
// pull new record from data source
record.setTuple(getNewData());
emit(record);
......@@ -63,9 +63,7 @@ public class IncrementalOLS {
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final int BATCH_SIZE = 1000;
private final int BATCH_SIZE = 10;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
......@@ -76,7 +74,7 @@ public class IncrementalOLS {
record.initRecords();
while (true) {
for (int j = 0; j < 1000; j++) {
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
......@@ -95,8 +93,6 @@ public class IncrementalOLS {
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
......@@ -113,8 +109,8 @@ public class IncrementalOLS {
for (int i = 0; i < numOfTuples; i++) {
Tuple t = record.getTuple(i);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
Double[] x_i = t.getField(1);
y[i] = t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
x[i][j] = x_i[j];
}
......@@ -130,8 +126,6 @@ public class IncrementalOLS {
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
// StreamRecord batchModel = null;
Double[] partialModel = new Double[] { 0.0, 0.0 };
......@@ -169,16 +163,14 @@ public class IncrementalOLS {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getTuple());
}
}
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
FaultToleranceType.NONE);
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
......@@ -197,18 +189,34 @@ public class IncrementalOLS {
public static void main(String[] args) {
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class WindowSumAggregate extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum") + number);
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum") - number);
}
}
private void produceRecord(long progress){
outRecord.setInteger(0, sum.get("sum"));
outRecord.setLong(1, progress);
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowSumLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Long>());
@Override
public void invoke(StreamRecord record) throws Exception {
Integer number = record.getInteger(0);
Long timestamp = record.getLong(1);
outputRecord.setInteger(0, number+1);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class RMQTopology {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getString(0));
}
}
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSink extends UserSinkInvokable {
private Integer sum = 0;
private long timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
sum = record.getInteger(0);
timestamp = record.getLong(1);
System.out.println("============================================");
System.out.println(sum + " " + timestamp);
System.out.println("============================================");
}
}
......@@ -13,27 +13,25 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
package eu.stratosphere.streaming.examples.window.sum;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public enum FaultToleranceType {
NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2);
public class WindowSumSource extends UserSourceInvokable {
public final int id;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
private Long timestamp = 0L;
FaultToleranceType(int id) {
this.id = id;
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; ++i) {
outRecord.setInteger(0, i);
outRecord.setLong(1, timestamp);
timestamp++;
emit(outRecord);
}
}
private static final Map<Integer, FaultToleranceType> map = new HashMap<Integer, FaultToleranceType>();
static {
for (FaultToleranceType type : FaultToleranceType.values())
map.put(type.id, type);
}
public static FaultToleranceType from(int id) {
return map.get(id);
}
}
\ No newline at end of file
}
......@@ -25,38 +25,33 @@ import eu.stratosphere.streaming.state.WindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int windowSize=10;
private int slidingStep=2;
private int computeGranularity=1;
private int windowFieldId=2;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private String word = "";
private Integer count = 0;
private long initTimestamp=-1;
private long nextTimestamp=-1;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
computeGranularity = 10;
windowFieldId = 2;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity, windowFieldId);
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
String word = record.getString(i, 0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
int count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
......@@ -65,8 +60,8 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = wordCounts.get(word) - 1;
String word = record.getString(i, 0);
int count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
......@@ -75,39 +70,51 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void produceRecord(long progress){
outRecord.Clear();
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if (window.isFull()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
emit(outRecord);
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
......@@ -24,14 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
......
......@@ -41,14 +41,13 @@ public class WindowWordCountSource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for(int i=0; i<10; ++i) {
while(true){
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
......
......@@ -21,16 +21,14 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp = 0L;
private StreamRecord outputRecord = new StreamRecord(3);
@Override
public void invoke(StreamRecord record) throws Exception {
outputRecord.Clear();
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
......
......@@ -15,22 +15,24 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", new WordCountSourceSplitter("src/test/resources/testdata/hamlet.txt"));
graphBuilder.setTask("WordCountCounter", new WordCountCounter());
graphBuilder.setSink("WordCountSink", new WordCountSink());
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
......@@ -42,15 +44,39 @@ public class WordCountLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
}
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
} catch (Exception e) {
System.out.println(e);
}
}
......
......@@ -34,7 +34,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -142,7 +141,7 @@ public class WordCountRemote {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
......
......@@ -20,9 +20,20 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -23,27 +23,22 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSourceSplitter extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private String fileName;
public WordCountSourceSplitter(String fileName) {
this.fileName = fileName;
}
@Override
public void invoke() throws Exception {
br = new BufferedReader(new FileReader(fileName));
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line = line.replaceAll("[\\-\\+\\.\\^:,]", "");
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
for (String word : line.split(" ")) {
outRecord.setString(0, word);
System.out.println("word=" + word);
......
......@@ -24,7 +24,6 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
......@@ -32,7 +31,7 @@ public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
......
......@@ -39,9 +39,10 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
public PerformanceCounter counter;
......@@ -56,22 +57,20 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
// TODO:update logs for channel
// TODO:get faulttolerancy type from user config, update logs for channel
// acks and fails
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
this.type = type;
switch (type) {
case EXACTLY_ONCE:
exactlyOnce = true;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -81,19 +80,18 @@ public class FaultToleranceUtil {
}
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, String componentName, int[] numberOfChannels) {
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
switch (type) {
case AT_LEAST_ONCE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case EXACTLY_ONCE:
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
......@@ -151,7 +149,7 @@ public class FaultToleranceUtil {
*/
public void failRecord(UID recordID, int channel) {
// if by ft type
if (type == FaultToleranceType.EXACTLY_ONCE) {
if (exactlyOnce) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
......
package eu.stratosphere.streaming.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* Source for reading messages from a RabbitMQ queue. The source currently only support string messages. Other types will be added soon.
*
*/
public class RMQSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
private transient String message;
StreamRecord record = new StreamRecord(new Tuple1<String>());
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
private void initializeConnection() {
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException e) {
}
}
@Override
public void invoke() {
initializeConnection();
while (true) {
try {
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
e.printStackTrace();
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
message = new String(delivery.getBody());
if (message.equals("quit")) {
break;
}
record.setString(0, message);
emit(record);
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
......@@ -15,12 +15,9 @@
package eu.stratosphere.streaming.state;
import java.util.HashMap;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The window state for window operator. To be general enough, this class
......@@ -29,54 +26,24 @@ import eu.stratosphere.streaming.index.IndexPair;
* stream into multiple mini batches.
*/
public class WindowState<K> {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int initTimestamp;
private int nextTimestamp;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
HashMap<K, IndexPair> windowIndex;
CircularFifoBuffer buffer;
StreamRecord tempRecord;
public WindowState(int windowSize, int slidingStep, int computeGranularity,
int windowFieldId) {
this.windowSize = windowSize;
this.slidingStep = slidingStep;
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
this.initTimestamp = -1;
this.nextTimestamp = -1;
public WindowState(int windowSize, int slidingStep, int computeGranularity) {
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
this.fullRecordCount = windowSize / computeGranularity;
this.slideRecordCount = slidingStep / computeGranularity;
this.windowIndex = new HashMap<K, IndexPair>();
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
if (initTimestamp == -1) {
initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
for (int i = 0; i < record.getNumOfTuples(); ++i) {
while ((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp) {
buffer.add(tempRecord);
currentRecordCount += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addTuple(record.getTuple(i));
}
buffer.add(record);
currentRecordCount += 1;
}
public StreamRecord popFront() {
......@@ -89,7 +56,7 @@ public class WindowState<K> {
return currentRecordCount >= fullRecordCount;
}
public boolean isComputable() {
public boolean isEmittable() {
if (currentRecordCount == fullRecordCount + slideRecordCount) {
currentRecordCount -= slideRecordCount;
return true;
......
......@@ -13,18 +13,24 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
package eu.stratosphere.streaming.state.manager;
import static org.junit.Assert.*;
import java.util.LinkedList;
import org.junit.Test;
import eu.stratosphere.streaming.state.TableState;
public class FaultToleranceTypeTest {
public class StateCheckpointer {
@Test
public void test() {
assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0));
assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1));
assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2));
private LinkedList<TableState> stateList = new LinkedList<TableState>();
public void RegisterState(TableState state){
stateList.add(state);
}
public void CheckpointStates(){
for(TableState state: stateList){
//take snapshot of every registered state.
}
}
}
......@@ -12,10 +12,9 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord;
public class FieldTypeMismatchException extends StreamRecordException{
private static final long serialVersionUID = 591915105653934643L;
package eu.stratosphere.streaming.state.manager;
public class StateRestorer {
}
package eu.stratosphere.streaming.util;
import java.net.InetSocketAddress;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.ProgramInvocationException;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
public class ClusterUtil {
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
*
* @param jobGraph
*/
public static void runOnMiniCluster(JobGraph jobGraph) {
System.out.println("Running on mini cluster");
Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
try {
exec.start();
client.run(jobGraph, true);
exec.stop();
} catch (Exception e) {
}
}
public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
System.out.println("Running on local cluster");
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(IP, port), configuration);
try {
client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
}
}
}
......@@ -29,7 +29,6 @@ public class LogUtils {
public static void initializeDefaultConsoleLogger(Level logLevel, Level rootLevel) {
Logger logger = Logger.getLogger("eu.stratosphere.streaming");
logger.removeAllAppenders();
logger.setAdditivity(false);
PatternLayout layout = new PatternLayout();
//layout.setConversionPattern("%highlight{%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n}");
//TODO Add highlight
......
......@@ -21,7 +21,11 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -36,51 +40,29 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
private static boolean fPTest = true;
public static class MySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
String out;
public MySource() {
}
public MySource(String string) {
out = string;
}
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 1000; i++) {
record.setField(0, i);
emit(record);
}
}
@Override
public String getResult() {
return out;
}
}
public static class MyTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
String out;
public MyTask() {
}
public MyTask(String string) {
out = string;
}
@Override
......@@ -89,21 +71,10 @@ public class StreamComponentTest {
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(i, i + 1)));
}
@Override
public String getResult() {
return out;
}
}
public static class MySink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
String out;
public MySink(String out) {
this.out = out;
public MySink() {
}
@Override
......@@ -115,28 +86,47 @@ public class StreamComponentTest {
@Override
public String getResult() {
return out;
return "";
}
}
@BeforeClass
public static void runStream() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
root.addAppender(new ConsoleAppender());
root.setLevel(Level.OFF);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", new MySource("source"));
graphBuilder.setTask("MyTask", new MyTask("task"), 2, 2);
graphBuilder.setSink("MySink", new MySink("sink"));
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2, 2);
graphBuilder.setSink("MySink", MySink.class);
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MyTask", "MySink");
ClusterUtil.runOnMiniCluster(graphBuilder.getJobGraph());
JobGraph jG = graphBuilder.getJobGraph();
Configuration configuration = jG.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void test() {
assertEquals(100, data.keySet().size());
Assert.assertTrue(fPTest);
assertEquals(1000, data.keySet().size());
for (Integer k : data.keySet()) {
assertEquals((Integer) (k + 1), data.get(k));
......
......@@ -258,47 +258,6 @@ public class StreamRecordTest {
fail();
} catch (NoSuchFieldException e) {
}
try {
a.getBoolean(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getByte(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getCharacter(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getDouble(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getFloat(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getInteger(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getLong(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getShort(0);
fail();
} catch (FieldTypeMismatchException e) {}
StreamRecord c = new StreamRecord(new Tuple1<Integer>(1));
try {
c.getString(0);
fail();
} catch (FieldTypeMismatchException e) {}
}
@Test
......
......@@ -33,7 +33,7 @@ public class FaultToleranceUtilTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
int[] numOfOutputchannels = { 1, 2 };
faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels);
faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels);
}
@Test
......
......@@ -83,7 +83,7 @@ public class InternalStateTest {
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10, 2);
WindowState<String> state=new WindowState<String>(100, 20, 10);
}
}
......@@ -10,107 +10,112 @@ import pandas as pd
import os
import operator
linestyles = ['_', '-', '--', ':']
markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>'];
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def readFiles(csv_dir):
counters=[]
dataframes=[]
for fname in os.listdir(csv_dir):
if '.csv' in fname:
counters.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return counters
dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return dataframes
def plotCounter(csv_dir, sname='', smooth=5,savePath=''):
counters= readFiles(csv_dir)
addSpeed(counters)
def plotCounter(csv_dir,name='', smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
if name=='':
selectedCounters=[]
for (name, number, df) in counters:
if sname in name:
selectedCounters.append((name, number, df))
if sname=='':
sname='counters'
save=savePath!=''
plotDfs(selectedCounters,smooth,save,savePath+'/'+sname)
def plotDfs(counters,smooth,save,saveFile):
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
for (name, number, df) in counters:
plt.title('dC/dT')
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
else:
df2=[]
for dataframe in dataframes:
if name in dataframe[0]:
df2.append(dataframe)
for dataframe in df2:
m=markers[dataframe[1]%len(markers)]
df.ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'C.png')
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for (name, number, df) in counters:
for dataframe in df2:
m=markers[number%len(markers)]
m=markers[dataframe[1]%len(markers)]
pd.rolling_mean(df.speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'D.png')
def addSpeed(counters):
for (tname, number, df) in counters:
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
return counters
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
def plotThroughput(csv_dir,taskname, smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
selected={}
for df in dataframes:
if taskname in df[0]:
if df[1] in selected:
selected[df[1]].append(df[2])
else:
selected[df[1]]=[df[2]]
plt.figure()
plt.title(taskname)
for i in selected:
selected[i]=reduce(operator.add,selected[i])
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
def plotThroughput(csv_dir,tasknames, smooth=5,savePath=''):
if type(tasknames)!=list:
tasknames=[tasknames]
for taskname in tasknames:
counters= readFiles(csv_dir)
addSpeed(counters)
selected={}
for (tname, number, df) in counters:
if taskname in tname:
if number in selected:
selected[number].append(df)
else:
selected[number]=[df]
plt.figure()
plt.title(taskname)
for i in selected:
if len(selected[i])>1:
selected[i]=reduce(operator.add,selected[i])
else:
selected[i]=selected[i][0]
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'C.png')
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'D.png')
plt.legend(selected.keys())
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册