提交 ad1a95cb 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] add stream join and stream window join example, refactor window state

上级 f6c8a62b
......@@ -78,6 +78,11 @@
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
......
......@@ -15,6 +15,9 @@
package eu.stratosphere.streaming.api;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -41,6 +44,7 @@ import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......@@ -58,14 +62,17 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
*
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Fault tolerance type
*/
public JobGraphBuilder(String jobGraphName) {
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
......@@ -73,10 +80,22 @@ public class JobGraphBuilder {
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
/**
* Adds a source component to the JobGraph
* Creates a new JobGraph with the given name with fault tolerance turned
* off
*
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName) {
this(jobGraphName, FaultToleranceType.NONE);
}
/**
* Adds a source component to the JobGraph with no parallelism
*
* @param sourceName
* Name of the source component
......@@ -110,7 +129,42 @@ public class JobGraphBuilder {
}
/**
* Adds a task component to the JobGraph
* Adds source to the JobGraph by user defined object with no parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject) {
setSource(sourceName, InvokableObject, 1, 1);
}
/**
* Adds source to the JobGraph by user defined object with the set
* parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableObject, parallelism, subtasksPerInstance, source);
log.debug("SOURCE: " + sourceName);
}
/**
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
......@@ -142,7 +196,39 @@ public class JobGraphBuilder {
}
/**
* Adds a sink component to the JobGraph
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject) {
setTask(taskName, TaskInvokableObject, 1, 1);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* @param TaskInvokableObject
* User defined UserTaskInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
log.debug("TASK: " + taskName);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
......@@ -173,6 +259,53 @@ public class JobGraphBuilder {
log.debug("SINK: " + sinkName);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject) {
setSink(sinkName, InvokableObject, 1, 1);
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableObject, parallelism, subtasksPerInstance, sink);
log.debug("SINK: " + sinkName);
}
/**
* Sets JobVertex configuration based on the given parameters
*
* @param componentName
* Name of the component
* @param InvokableClass
* Class of the user defined Invokable
* @param parallelism
* Number of subtasks
* @param subtasksPerInstance
* Number of subtasks per instance
* @param component
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
......@@ -187,10 +320,66 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
private void setComponent(String componentName, UserSourceInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserTaskInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserSinkInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
* @param InvokableObject
* Invokable object to serialize
* @param component
* JobVertex to which the serialized invokable will be added
*/
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(InvokableObject);
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass());
}
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
......@@ -226,6 +415,13 @@ public class JobGraphBuilder {
}
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
......@@ -233,6 +429,9 @@ public class JobGraphBuilder {
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
public void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
......@@ -349,6 +548,9 @@ public class JobGraphBuilder {
}
}
/**
* Writes number of inputs into each JobVertex's config
*/
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
......@@ -356,6 +558,10 @@ public class JobGraphBuilder {
}
}
/**
* Writes the number of outputs and output channels into each JobVertex's
* config
*/
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs",
......@@ -372,7 +578,9 @@ public class JobGraphBuilder {
}
/**
* @return The JobGraph object
* Returns the JobGraph
*
* @return JobGraph object
*/
public JobGraph getJobGraph() {
setAutomaticInstanceSharing();
......
......@@ -19,7 +19,10 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable extends UserSinkInvokable {
@Override
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0, 0);
System.out.println(value.getValue());
......
......@@ -20,14 +20,16 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultSourceInvokable extends UserSourceInvokable {
private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" ");
private static final long serialVersionUID = 1L;
@Override
public void invoke() throws Exception {
for (String word : mottoArray) {
emit( new StreamRecord(new Tuple1<String>(word)));
}
}
private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" ");
@Override
public void invoke() throws Exception {
for (String word : mottoArray) {
emit(new StreamRecord(new Tuple1<String>(word)));
}
}
}
......@@ -18,6 +18,9 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultTaskInvokable extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
emit(record);
......
......@@ -17,6 +17,6 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public interface RecordInvokable extends UserInvokable {
public interface RecordInvokable extends UserInvokable {
public void invoke(StreamRecord record) throws Exception;
}
......@@ -15,8 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
public abstract class UserSinkInvokable implements RecordInvokable {
// TODO consider a common interface with StreamInvokableComponents
import java.io.Serializable;
public abstract class UserSinkInvokable implements RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
public String getResult() {
return "Override getResult() to pass your own results";
}
......
......@@ -15,7 +15,13 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable {
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable,
Serializable {
private static final long serialVersionUID = 1L;
}
......@@ -15,7 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserTaskInvokable extends StreamInvokableComponent implements RecordInvokable {
public abstract class UserTaskInvokable extends StreamInvokableComponent implements
RecordInvokable, Serializable {
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
......@@ -15,7 +15,9 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.List;
......@@ -34,10 +36,12 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......@@ -51,6 +55,37 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return numComponents;
}
public RecordInvoker setFaultTolerance(FaultToleranceUtil util, FaultToleranceType type,
Configuration config, List<RecordWriter<StreamRecord>> outputs, int taskInstanceID,
String name, int[] numberOfOutputChannels) {
type = FaultToleranceType.from(config.getInteger("faultToleranceType", 0));
RecordInvoker invoker = getRecordInvoker(type);
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
util = new FaultToleranceUtil(type, outputs, taskInstanceID, name,
numberOfOutputChannels);
break;
case NONE:
default:
util = null;
break;
}
return invoker;
}
public RecordInvoker getRecordInvoker(FaultToleranceType type) {
switch (type) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
return new InvokerWithFaultTolerance();
case NONE:
default:
return new Invoker();
}
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
......@@ -75,22 +110,25 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration, List<StreamRecordReader<StreamRecord>> inputs)
throws StreamComponentException {
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<StreamRecordReader<StreamRecord>> inputs) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase, StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase, StreamRecord.class));
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
}
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration, List<RecordWriter<StreamRecord>> outputs,
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
......@@ -98,10 +136,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase, StreamRecord.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase, StreamRecord.class,
outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
......@@ -110,33 +149,66 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
// TODO consider logging stack trace!
public StreamInvokableComponent getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name, FaultToleranceUtil recordBuffer) {
List<RecordWriter<StreamRecord>> outputs, int instanceID, String name,
FaultToleranceUtil recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultTaskInvokable.class, StreamInvokableComponent.class);
Class<? extends StreamInvokableComponent> userFunctionClass = taskConfiguration.getClass(
"userfunction", DefaultTaskInvokable.class, StreamInvokableComponent.class);
StreamInvokableComponent userFunction = null;
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceBuffer", 0));
byte[] userFunctionSerialized = taskConfiguration.getBytes("serializedudf", null);
if (userFunctionSerialized != null) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamInvokableComponent) ois.readObject();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (Exception e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer);
} catch (InstantiationException e) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
return userFunction;
}
......@@ -158,27 +230,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass("partitionerClass_"
+ nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
partitioners.add(partitioner.newInstance());
}
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput + " outputs");
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
} catch (Exception e) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs", e);
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs", e);
}
}
public void invokeRecords(RecordInvokable userFunction, List<StreamRecordReader<StreamRecord>> inputs, String name)
throws Exception {
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
while (hasInput) {
......@@ -186,13 +261,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
StreamRecord record = input.next();
// UID id = record.getId();
userFunction.invoke(record);
// threadSafePublish(new AckEvent(id), input);
// log.debug("ACK: " + id + " -- " + name);
}
else if (input.isInputClosed()) {
invoker.call(name, userFunction, input);
} else if (input.isInputClosed()) {
closedInputs.add(input);
}
}
......@@ -200,4 +270,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
......@@ -22,10 +23,13 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent {
public abstract class StreamInvokableComponent implements Serializable {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
......@@ -35,36 +39,52 @@ public abstract class StreamInvokableComponent {
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords) {
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void setPerfCounterDir(String dir) {
performanceCounter.setFname(dir + "/" + name + channelID);
}
public final void emit(StreamRecord record) {
record.setId(channelID);
// emittedRecords.addRecord(record);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
emittedRecords.failRecord(record.getId());
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
// TODO: Add fault tolerance
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
// emittedRecords.addRecord(record, outputChannel);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
......
......@@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
......@@ -34,6 +36,7 @@ public class StreamSink extends AbstractOutputTask {
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
private RecordInvoker invoker;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -52,13 +55,17 @@ public class StreamSink extends AbstractOutputTask {
} catch (Exception e) {
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(userFunction, inputs, name);
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
......
......@@ -28,6 +28,7 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> {
......@@ -41,6 +42,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
private int sourceInstanceID;
private String name;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
......@@ -49,7 +51,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
streamSourceHelper = new StreamComponentHelper<StreamSource>();
numSources=StreamComponentHelper.newComponent();
numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
}
......@@ -69,20 +71,21 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
log.error("Cannot register outputs", e);
}
int[] numberOfOutputChannels= new int[outputs.size()];
for(int i=0; i<numberOfOutputChannels.length;i++ ){
numberOfOutputChannels[i]=taskConfiguration.getInteger("channels_"+i, 0);
int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
}
......@@ -93,7 +96,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
userFunction.invoke();
// TODO print to file
System.out.println(userFunction.getResult());
}
}
......@@ -26,7 +26,9 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask {
......@@ -40,7 +42,9 @@ public class StreamTask extends AbstractTask {
private static int numTasks;
private int taskInstanceID;
private String name;
StreamComponentHelper<StreamTask> streamTaskHelper;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
private RecordInvoker invoker;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
......@@ -73,7 +77,9 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
invoker = streamTaskHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
......@@ -84,10 +90,10 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(userFunction, inputs, name);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
}
\ No newline at end of file
......@@ -18,15 +18,16 @@ package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class StreamWindowTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private int computeGranularity;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
......@@ -35,7 +36,7 @@ public class StreamWindowTask extends UserTaskInvokable {
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord;
public class FieldTypeMismatchException extends StreamRecordException{
private static final long serialVersionUID = 591915105653934643L;
}
......@@ -86,16 +86,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates empty StreamRecord with number of fields set
*
......@@ -124,6 +114,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates a new batch of records containing only the given Tuple as element
* and sets desired batch size.
......@@ -167,68 +167,20 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Remove all the contents inside StreamRecord.
*/
public void Clear(){
this.numOfTuples = 0;
tupleBatch.clear();
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
* Checks whether the record batch is empty
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
* @return true if the batch is empty, false if it contains Tuples
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
* Remove all the contents inside StreamRecord.
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
public void Clear() {
this.numOfTuples = 0;
tupleBatch.clear();
}
/**
......@@ -352,11 +304,15 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// TODO: add exception for cast for all getters
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Boolean) getField(tupleNumber, fieldNumber);
try {
return (Boolean) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
* Get a Byte from the given field of the first Tuple of the batch
* Get a Byte from thne given field of the first Tuple of the batch
*
* @param fieldNumber
* Position of the field in the tuple
......@@ -381,7 +337,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Byte) getField(tupleNumber, fieldNumber);
try {
return (Byte) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -411,7 +371,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Character) getField(tupleNumber, fieldNumber);
try {
return (Character) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -440,7 +404,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Double) getField(tupleNumber, fieldNumber);
try {
return (Double) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -469,7 +437,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Float) getField(tupleNumber, fieldNumber);
try {
return (Float) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -498,7 +470,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Integer) getField(tupleNumber, fieldNumber);
try {
return (Integer) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -527,7 +503,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Long) getField(tupleNumber, fieldNumber);
try {
return (Long) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -556,7 +536,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Short) getField(tupleNumber, fieldNumber);
try {
return (Short) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -583,7 +567,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (String) getField(tupleNumber, fieldNumber);
try {
return (String) getField(tupleNumber, fieldNumber);
} catch (ClassCastException e) {
throw new FieldTypeMismatchException();
}
}
/**
......@@ -612,7 +600,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
// TODO: consider no such tuple exception and interaction with batch size
// TODO: consider interaction with batch size
public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
try {
tupleBatch.get(tupleNumber).setField(o, fieldNumber);
......@@ -1002,6 +990,57 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
......@@ -1298,8 +1337,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public String toString() {
StringBuilder outputString = new StringBuilder("[");
String prefix = "";
for (Tuple tuple : tupleBatch) {
outputString.append(tuple + ",");
outputString.append(prefix);
prefix = ",";
outputString.append(tuple.toString());
}
outputString.append("]");
return outputString.toString();
......
......@@ -17,6 +17,7 @@ package eu.stratosphere.streaming.api.streamrecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
......@@ -65,6 +66,18 @@ public class UID implements IOReadableWritable, Serializable {
public void write(DataOutput out) throws IOException {
out.write(uid.array());
}
private void writeObject(ObjectOutputStream stream)
throws IOException {
stream.write(uid.array());
}
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
byte[] uidA = new byte[20];
stream.read(uidA);
uid = ByteBuffer.allocate(20).put(uidA);
}
@Override
public void read(DataInput in) throws IOException {
......
......@@ -14,26 +14,23 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.basictopology;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology {
public static class BasicSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<String>("streaming"));
@Override
......@@ -49,6 +46,7 @@ public class BasicTopology {
}
public static class BasicTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -60,19 +58,20 @@ public class BasicTopology {
}
public static class BasicSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// do nothing
record.getField(0);
System.out.println(record.getField(0));
}
}
private static JobGraph getJobGraph() throws Exception {
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1);
graphBuilder.setSource("BasicSource", new BasicSource(), 1, 1);
graphBuilder.setTask("BasicTask", new BasicTask(), 1, 1);
graphBuilder.setSink("BasicSink", new BasicSink(), 1, 1);
graphBuilder.shuffleConnect("BasicSource", "BasicTask");
graphBuilder.shuffleConnect("BasicTask", "BasicSink");
......@@ -83,34 +82,8 @@ public class BasicTopology {
public static void main(String[] args) {
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchForwardLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
......
......@@ -19,7 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchForwardSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
private String word = new String();
@Override
......
......@@ -20,6 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchForwardSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final StreamRecord mottoRecord = new StreamRecord(
new Tuple1<String>("Stratosphere Big Data looks tiny from here"));
......
......@@ -23,7 +23,8 @@ import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
public class BatchWordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
......
......@@ -19,7 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchWordCountSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
......
......@@ -24,7 +24,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchWordCountSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
......
......@@ -20,6 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchWordCountSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(3);
......
......@@ -24,12 +24,13 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class CellInfoLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
......
......@@ -19,7 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CellSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
int counter = 0;
@Override
......
......@@ -20,7 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CellTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private WorkerEngineExact engine = new WorkerEngineExact(10, 500,
System.currentTimeMillis());
Integer cellID;
......
......@@ -22,7 +22,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class InfoSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
Random rand = new Random();
private final static int CELL_COUNT = 10;
......
......@@ -22,7 +22,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class QuerySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
Random rand = new Random();
private final static int CELL_COUNT = 10;
private final static int LAST_MILLIS = 1000;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class JoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("JoinSourceOne", JoinSourceOne.class);
graphBuilder.setSource("JoinSourceTwo", JoinSourceTwo.class);
graphBuilder.setTask("JoinTask", JoinTask.class, 1, 1);
graphBuilder.setSink("JoinSink", JoinSink.class);
graphBuilder.fieldsConnect("JoinSourceOne", "JoinTask", 1);
graphBuilder.fieldsConnect("JoinSourceTwo", "JoinTask", 1);
graphBuilder.shuffleConnect("JoinTask", "JoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSourceOne extends UserSourceInvokable {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, Integer>());
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
emit(outRecord);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSourceTwo extends UserSourceInvokable {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, String>());
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
emit(outRecord);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinTask extends UserTaskInvokable {
private static final long serialVersionUID = 749913336259789039L;
private HashMap<String, ArrayList<String>> gradeHashmap;
private HashMap<String, ArrayList<Integer>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public JoinTask() {
gradeHashmap = new HashMap<String, ArrayList<String>>();
salaryHashmap = new HashMap<String, ArrayList<Integer>>();
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
for (Integer salary : salaryHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), salary);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new ArrayList<String>());
}
gradeHashmap.get(name).add(record.getString(2));
} else {
if (gradeHashmap.containsKey(name)) {
for (String grade : gradeHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new ArrayList<Integer>());
}
salaryHashmap.get(name).add(record.getInteger(2));
}
}
}
......@@ -35,7 +35,8 @@ public class IncrementalLearningSkeleton {
// Source for feeding new data for prediction
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>(1));
@Override
......@@ -56,7 +57,8 @@ public class IncrementalLearningSkeleton {
// Source for feeding new training data for partial model building
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// Number of tuples grouped for building partial model
private final int BATCH_SIZE = 1000;
......@@ -87,7 +89,8 @@ public class IncrementalLearningSkeleton {
// Task for building up-to-date partial models on new training data
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
......@@ -103,7 +106,8 @@ public class IncrementalLearningSkeleton {
// Task for performing prediction using the model produced in
// batch-processing and the up-to-date partial model
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
StreamRecord batchModel = null;
StreamRecord partialModel = null;
......@@ -136,7 +140,8 @@ public class IncrementalLearningSkeleton {
}
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// do nothing
......
......@@ -14,7 +14,6 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
import java.net.InetSocketAddress;
import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
......@@ -24,21 +23,22 @@ import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(2, 1);
Random rnd = new Random();
......@@ -46,7 +46,7 @@ public class IncrementalOLS {
@Override
public void invoke() throws Exception {
record.initRecords();
for (int j = 0; j < 100; j++) {
while (true) {
// pull new record from data source
record.setTuple(getNewData());
emit(record);
......@@ -62,8 +62,9 @@ public class IncrementalOLS {
}
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final int BATCH_SIZE = 10;
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
......@@ -74,7 +75,7 @@ public class IncrementalOLS {
record.initRecords();
for (int j = 0; j < 1000; j++) {
while (true) {
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
......@@ -92,6 +93,7 @@ public class IncrementalOLS {
}
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -109,8 +111,8 @@ public class IncrementalOLS {
for (int i = 0; i < numOfTuples; i++) {
Tuple t = record.getTuple(i);
Double[] x_i = t.getField(1);
y[i] = t.getField(0);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
x[i][j] = x_i[j];
}
......@@ -125,6 +127,7 @@ public class IncrementalOLS {
}
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
// StreamRecord batchModel = null;
Double[] partialModel = new Double[] { 0.0, 0.0 };
......@@ -162,21 +165,22 @@ public class IncrementalOLS {
}
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getTuple());
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS");
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1);
graphBuilder.setTask("Predictor", Predictor.class, 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
graphBuilder.setSink("Sink",new Sink(), 1, 1);
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor");
......@@ -189,34 +193,18 @@ public class IncrementalOLS {
public static void main(String[] args) {
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
if (args.length == 0) {
args = new String[] { "local" };
}
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
} catch (Exception e) {
System.out.println(e);
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowJoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowJoinSourceOne", WindowJoinSourceOne.class);
graphBuilder.setSource("WindowJoinSourceTwo", WindowJoinSourceTwo.class);
graphBuilder.setTask("WindowJoinTask", WindowJoinTask.class, 1, 1);
graphBuilder.setSink("WindowJoinSink", WindowJoinSink.class);
graphBuilder.fieldsConnect("WindowJoinSourceOne", "WindowJoinTask", 1);
graphBuilder.fieldsConnect("WindowJoinSourceTwo", "WindowJoinTask", 1);
graphBuilder.shuffleConnect("WindowJoinTask", "WindowJoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSourceOne extends UserSourceInvokable {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, Integer, Long>());
private long progress = 0L;
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSourceTwo extends UserSourceInvokable {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, String, Long>());
private long progress = 0L;
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinTask extends UserTaskInvokable {
class SalaryProgress {
public SalaryProgress(int salary, long progress) {
this.salary = salary;
this.progress = progress;
}
int salary;
long progress;
}
class GradeProgress {
public GradeProgress(String grade, long progress) {
this.grade = grade;
this.progress = progress;
}
String grade;
long progress;
}
private static final long serialVersionUID = 749913336259789039L;
private int windowSize = 100;
private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public WindowJoinTask() {
gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
long progress = record.getLong(3);
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name)
.iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), entry.salary);
outRecord.addTuple(outputTuple);
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
}
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
gradeHashmap.get(name).add(
new GradeProgress(record.getString(2), progress));
} else {
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name)
.iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, entry.grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
}
outRecord.Clear();
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
}
salaryHashmap.get(name).add(
new SalaryProgress(record.getInteger(2), progress));
}
}
}
......@@ -19,17 +19,18 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class WindowSumAggregate extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
......@@ -38,7 +39,7 @@ public class WindowSumAggregate extends UserTaskInvokable {
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
......
......@@ -20,7 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Long>());
@Override
......
......@@ -19,7 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
private Integer sum = 0;
private long timestamp = 0;
......
......@@ -20,7 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
private Long timestamp = 0L;
......
......@@ -21,17 +21,18 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private int windowSize=10;
private int slidingStep=2;
private int computeGranularity=1;
private int windowFieldId=2;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp=-1;
private long nextTimestamp=-1;
......@@ -39,7 +40,7 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
}
......
......@@ -24,13 +24,14 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
......
......@@ -19,7 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
......
......@@ -24,7 +24,8 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
......
......@@ -20,6 +20,8 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Long timestamp = 0L;
private StreamRecord outputRecord = new StreamRecord(3);
......
......@@ -21,6 +21,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
......
......@@ -15,24 +15,22 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", new WordCountSourceSplitter("src/test/resources/testdata/hamlet.txt"));
graphBuilder.setTask("WordCountCounter", new WordCountCounter());
graphBuilder.setSink("WordCountSink", new WordCountSink());
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
......@@ -44,39 +42,15 @@ public class WordCountLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
if (args.length == 0) {
args = new String[] { "local" };
}
}
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
} catch (Exception e) {
System.out.println(e);
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
}
}
......
......@@ -34,6 +34,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceCounter;
......@@ -41,6 +42,7 @@ public class WordCountRemote {
private final static int recordsEmitted = 100000;
public static class WordCountDebugSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000, "");
......@@ -69,7 +71,8 @@ public class WordCountRemote {
}
public static class WordCountDebugSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private PerformanceCounter perf = new PerformanceCounter("SplitterEmitCounter", 1000, 10000, "");
private String[] words = new String[] {};
......@@ -141,7 +144,7 @@ public class WordCountRemote {
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
......
......@@ -19,6 +19,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
private String word = "";
private Integer count = 0;
......
......@@ -23,6 +23,7 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = new String();
......
......@@ -23,22 +23,27 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSourceSplitter extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private String fileName;
public WordCountSourceSplitter(String fileName) {
this.fileName = fileName;
}
@Override
public void invoke() throws Exception {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
br = new BufferedReader(new FileReader(fileName));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
line = line.replaceAll("[\\-\\+\\.\\^:,]", "");
for (String word : line.split(" ")) {
outRecord.setString(0, word);
System.out.println("word=" + word);
......
......@@ -22,6 +22,7 @@ import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
......
......@@ -24,6 +24,7 @@ import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
......@@ -31,7 +32,7 @@ public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.Map;
public enum FaultToleranceType {
NONE(0), AT_LEAST_ONCE(1), EXACTLY_ONCE(2);
public final int id;
FaultToleranceType(int id) {
this.id = id;
}
private static final Map<Integer, FaultToleranceType> map = new HashMap<Integer, FaultToleranceType>();
static {
for (FaultToleranceType type : FaultToleranceType.values())
map.put(type.id, type);
}
public static FaultToleranceType from(int id) {
return map.get(id);
}
}
\ No newline at end of file
......@@ -39,10 +39,9 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
public PerformanceCounter counter;
......@@ -57,20 +56,22 @@ public class FaultToleranceUtil {
* @param numberOfChannels
* Number of output channels for the output components
*/
// TODO:get faulttolerancy type from user config, update logs for channel
// TODO:update logs for channel
// acks and fails
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = true;
if (exactlyOnce) {
this.type = type;
switch (type) {
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -80,18 +81,19 @@ public class FaultToleranceUtil {
}
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
switch (type) {
case AT_LEAST_ONCE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
......@@ -149,7 +151,7 @@ public class FaultToleranceUtil {
*/
public void failRecord(UID recordID, int channel) {
// if by ft type
if (exactlyOnce) {
if (type == FaultToleranceType.EXACTLY_ONCE) {
StreamRecord failed = buffer.failChannel(recordID, channel);
if (failed != null) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* Source for reading messages from a RabbitMQ queue. The source currently only support string messages. Other types will be added soon.
*
*/
public class RMQSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
private transient String message;
StreamRecord record = new StreamRecord(new Tuple1<String>());
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
private void initializeConnection() {
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException e) {
}
}
@Override
public void invoke() {
initializeConnection();
while (true) {
try {
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
e.printStackTrace();
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
message = new String(delivery.getBody());
if (message.equals("q")) {
break;
}
record.setString(0, message);
emit(record);
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class RMQTopology {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getString(0));
}
}
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1);
graphBuilder.setSink("Sink", new Sink(), 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
\ No newline at end of file
......@@ -25,14 +25,14 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowState<K> {
public class SlidingWindowState<K> {
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
CircularFifoBuffer buffer;
public WindowState(int windowSize, int slidingStep, int computeGranularity) {
public SlidingWindowState(int windowSize, int slidingStep, int computeGranularity) {
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowStateIterator<K>{
public class SlidingWindowStateIterator<K>{
public boolean hasNext() {
// TODO Auto-generated method stub
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import java.net.InetSocketAddress;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.ProgramInvocationException;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
public class ClusterUtil {
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
*
* @param jobGraph
*/
public static void runOnMiniCluster(JobGraph jobGraph) {
System.out.println("Running on mini cluster");
Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
try {
exec.start();
client.run(jobGraph, true);
exec.stop();
} catch (Exception e) {
}
}
public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
System.out.println("Running on local cluster");
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(IP, port), configuration);
try {
client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
}
}
}
......@@ -29,6 +29,7 @@ public class LogUtils {
public static void initializeDefaultConsoleLogger(Level logLevel, Level rootLevel) {
Logger logger = Logger.getLogger("eu.stratosphere.streaming");
logger.removeAllAppenders();
logger.setAdditivity(false);
PatternLayout layout = new PatternLayout();
//layout.setConversionPattern("%highlight{%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n}");
//TODO Add highlight
......
......@@ -148,4 +148,8 @@ public class PerformanceTracker {
}
public void setFname(String fname) {
this.fname = fname;
}
}
......@@ -17,52 +17,65 @@ package eu.stratosphere.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
private static boolean fPTest = true;
public static class MySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
String out;
public MySource() {
}
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
public MySource(String string) {
out = string;
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 100; i++) {
record.setField(0, i);
emit(record);
}
}
@Override
public String getResult() {
return out;
}
}
public static class MyTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
String out;
public MyTask() {
}
public MyTask(String string) {
out = string;
}
@Override
......@@ -71,10 +84,21 @@ public class StreamComponentTest {
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(i, i + 1)));
}
@Override
public String getResult() {
return out;
}
}
public static class MySink extends UserSinkInvokable {
public MySink() {
private static final long serialVersionUID = 1L;
String out;
public MySink(String out) {
this.out = out;
}
@Override
......@@ -86,47 +110,28 @@ public class StreamComponentTest {
@Override
public String getResult() {
return "";
return out;
}
}
@BeforeClass
public static void runStream() {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
root.addAppender(new ConsoleAppender());
root.setLevel(Level.OFF);
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2, 2);
graphBuilder.setSink("MySink", MySink.class);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", new MySource("source"));
graphBuilder.setTask("MyTask", new MyTask("task"), 2, 2);
graphBuilder.setSink("MySink", new MySink("sink"));
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MyTask", "MySink");
JobGraph jG = graphBuilder.getJobGraph();
Configuration configuration = jG.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} catch (Exception e) {
e.printStackTrace();
}
ClusterUtil.runOnMiniCluster(graphBuilder.getJobGraph());
}
@Test
public void test() {
Assert.assertTrue(fPTest);
assertEquals(1000, data.keySet().size());
assertEquals(100, data.keySet().size());
for (Integer k : data.keySet()) {
assertEquals((Integer) (k + 1), data.get(k));
......
......@@ -258,6 +258,47 @@ public class StreamRecordTest {
fail();
} catch (NoSuchFieldException e) {
}
try {
a.getBoolean(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getByte(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getCharacter(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getDouble(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getFloat(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getInteger(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getLong(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getShort(0);
fail();
} catch (FieldTypeMismatchException e) {}
StreamRecord c = new StreamRecord(new Tuple1<Integer>(1));
try {
c.getString(0);
fail();
} catch (FieldTypeMismatchException e) {}
}
@Test
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*;
import org.junit.Test;
public class FaultToleranceTypeTest {
@Test
public void test() {
assertEquals(FaultToleranceType.NONE, FaultToleranceType.from(0));
assertEquals(FaultToleranceType.AT_LEAST_ONCE, FaultToleranceType.from(1));
assertEquals(FaultToleranceType.EXACTLY_ONCE, FaultToleranceType.from(2));
}
}
......@@ -33,7 +33,7 @@ public class FaultToleranceUtilTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
int[] numOfOutputchannels = { 1, 2 };
faultTolerancyBuffer = new FaultToleranceUtil(outputs, 1, numOfOutputchannels);
faultTolerancyBuffer = new FaultToleranceUtil(FaultToleranceType.EXACTLY_ONCE, outputs, 1, numOfOutputchannels);
}
@Test
......
......@@ -21,7 +21,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class InternalStateTest {
......@@ -83,7 +83,7 @@ public class InternalStateTest {
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10);
SlidingWindowState<String> state=new SlidingWindowState<String>(100, 20, 10);
}
}
......@@ -10,112 +10,107 @@ import pandas as pd
import os
import operator
linestyles = ['_', '-', '--', ':']
markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>'];
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def readFiles(csv_dir):
dataframes=[]
counters=[]
for fname in os.listdir(csv_dir):
if '.csv' in fname:
dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return dataframes
counters.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return counters
def plotCounter(csv_dir,name='', smooth=5):
dataframes= readFiles(csv_dir)
def plotCounter(csv_dir, sname='', smooth=5,savePath=''):
counters= readFiles(csv_dir)
addSpeed(counters)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
if name=='':
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
selectedCounters=[]
for (name, number, df) in counters:
if sname in name:
selectedCounters.append((name, number, df))
if sname=='':
sname='counters'
save=savePath!=''
plotDfs(selectedCounters,smooth,save,savePath+'/'+sname)
def plotDfs(counters,smooth,save,saveFile):
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
plt.title('Counter')
for (name, number, df) in counters:
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
else:
df2=[]
for dataframe in dataframes:
if name in dataframe[0]:
df2.append(dataframe)
for dataframe in df2:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
df.ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'C.png')
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for dataframe in df2:
for (name, number, df) in counters:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
def plotThroughput(csv_dir,taskname, smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
selected={}
for df in dataframes:
if taskname in df[0]:
if df[1] in selected:
selected[df[1]].append(df[2])
else:
selected[df[1]]=[df[2]]
plt.figure()
plt.title(taskname)
for i in selected:
selected[i]=reduce(operator.add,selected[i])
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
pd.rolling_mean(df.speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'D.png')
def addSpeed(counters):
for (tname, number, df) in counters:
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
return counters
plt.legend(selected.keys())
def plotThroughput(csv_dir,tasknames, smooth=5,savePath=''):
if type(tasknames)!=list:
tasknames=[tasknames]
for taskname in tasknames:
counters= readFiles(csv_dir)
addSpeed(counters)
selected={}
for (tname, number, df) in counters:
if taskname in tname:
if number in selected:
selected[number].append(df)
else:
selected[number]=[df]
plt.figure()
plt.title(taskname)
for i in selected:
if len(selected[i])>1:
selected[i]=reduce(operator.add,selected[i])
else:
selected[i]=selected[i][0]
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'C.png')
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'D.png')
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册