提交 3b787e29 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] api cleanup

上级 75431b7d
...@@ -69,6 +69,8 @@ public class JobGraphBuilder { ...@@ -69,6 +69,8 @@ public class JobGraphBuilder {
* *
* @param jobGraphName * @param jobGraphName
* Name of the JobGraph * Name of the JobGraph
* @param faultToleranceType
* Fault tolerance type
*/ */
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) { public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
jobGraph = new JobGraph(jobGraphName); jobGraph = new JobGraph(jobGraphName);
...@@ -81,6 +83,17 @@ public class JobGraphBuilder { ...@@ -81,6 +83,17 @@ public class JobGraphBuilder {
this.faultToleranceType = faultToleranceType; this.faultToleranceType = faultToleranceType;
} }
/**
* Creates a new JobGraph with the given name with fault tolerance turned
* off
*
* @param jobGraphName
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName) {
this(jobGraphName, FaultToleranceType.NONE);
}
/** /**
* Adds a source component to the JobGraph with no parallelism * Adds a source component to the JobGraph with no parallelism
* *
......
...@@ -52,6 +52,10 @@ public abstract class StreamInvokableComponent implements Serializable { ...@@ -52,6 +52,10 @@ public abstract class StreamInvokableComponent implements Serializable {
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE; this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
} }
public final void setPerfCounterDir(String dir) {
performanceCounter.setFname(dir + "/" + name + channelID);
}
public final void emit(StreamRecord record) { public final void emit(StreamRecord record) {
record.setId(channelID); record.setId(channelID);
...@@ -69,7 +73,7 @@ public abstract class StreamInvokableComponent implements Serializable { ...@@ -69,7 +73,7 @@ public abstract class StreamInvokableComponent implements Serializable {
if (useFaultTolerance) { if (useFaultTolerance) {
emittedRecords.failRecord(record.getId()); emittedRecords.failRecord(record.getId());
} }
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to " log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName()); + e.getClass().getSimpleName());
} }
......
...@@ -1324,8 +1324,12 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -1324,8 +1324,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public String toString() { public String toString() {
StringBuilder outputString = new StringBuilder("["); StringBuilder outputString = new StringBuilder("[");
String prefix = "";
for (Tuple tuple : tupleBatch) { for (Tuple tuple : tupleBatch) {
outputString.append(tuple + ","); outputString.append(prefix);
prefix = ",";
outputString.append(tuple.toString());
} }
outputString.append("]"); outputString.append("]");
return outputString.toString(); return outputString.toString();
......
...@@ -14,14 +14,9 @@ ...@@ -14,14 +14,9 @@
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.examples.basictopology; package eu.stratosphere.streaming.examples.basictopology;
import java.net.InetSocketAddress;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
...@@ -29,12 +24,14 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; ...@@ -29,12 +24,14 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
public class BasicTopology { public class BasicTopology {
public static class BasicSource extends UserSourceInvokable { public static class BasicSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<String>("streaming")); StreamRecord record = new StreamRecord(new Tuple1<String>("streaming"));
@Override @Override
...@@ -50,6 +47,7 @@ public class BasicTopology { ...@@ -50,6 +47,7 @@ public class BasicTopology {
} }
public static class BasicTask extends UserTaskInvokable { public static class BasicTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
...@@ -61,19 +59,20 @@ public class BasicTopology { ...@@ -61,19 +59,20 @@ public class BasicTopology {
} }
public static class BasicSink extends UserSinkInvokable { public static class BasicSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
// do nothing // do nothing
record.getField(0); System.out.println(record.getField(0));
} }
} }
private static JobGraph getJobGraph() throws Exception { private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology", FaultToleranceType.NONE); JobGraphBuilder graphBuilder = new JobGraphBuilder("BasicStreamingTopology");
graphBuilder.setSource("BasicSource", BasicSource.class, 1, 1); graphBuilder.setSource("BasicSource", new BasicSource(), 1, 1);
graphBuilder.setTask("BasicTask", BasicTask.class, 1, 1); graphBuilder.setTask("BasicTask", new BasicTask(), 1, 1);
graphBuilder.setSink("BasicSink", BasicSink.class, 1, 1); graphBuilder.setSink("BasicSink", new BasicSink(), 1, 1);
graphBuilder.shuffleConnect("BasicSource", "BasicTask"); graphBuilder.shuffleConnect("BasicSource", "BasicTask");
graphBuilder.shuffleConnect("BasicTask", "BasicSink"); graphBuilder.shuffleConnect("BasicTask", "BasicSink");
...@@ -84,34 +83,8 @@ public class BasicTopology { ...@@ -84,34 +83,8 @@ public class BasicTopology {
public static void main(String[] args) { public static void main(String[] args) {
// set logging parameters for local run // set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO); LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true); ClusterUtil.runOnMiniCluster(getJobGraph());
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
} }
} }
...@@ -154,11 +154,11 @@ public class IncrementalLearningSkeleton { ...@@ -154,11 +154,11 @@ public class IncrementalLearningSkeleton {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning", JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning",
FaultToleranceType.NONE); FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1); graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1); graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1); graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
graphBuilder.setTask("Predictor", Predictor.class, 1, 1); graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1); graphBuilder.setSink("Sink",new Sink(), 1, 1);
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder"); graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor"); graphBuilder.shuffleConnect("NewData", "Predictor");
......
...@@ -180,11 +180,11 @@ public class IncrementalOLS { ...@@ -180,11 +180,11 @@ public class IncrementalOLS {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS", JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
FaultToleranceType.NONE); FaultToleranceType.NONE);
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1); graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1); graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1); graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
graphBuilder.setTask("Predictor", Predictor.class, 1, 1); graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1); graphBuilder.setSink("Sink",new Sink(), 1, 1);
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder"); graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor"); graphBuilder.shuffleConnect("NewData", "Predictor");
......
...@@ -21,6 +21,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; ...@@ -21,6 +21,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState; import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable { public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>(); private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = ""; private String word = "";
......
...@@ -42,6 +42,7 @@ public class WordCountRemote { ...@@ -42,6 +42,7 @@ public class WordCountRemote {
private final static int recordsEmitted = 100000; private final static int recordsEmitted = 100000;
public static class WordCountDebugSource extends UserSourceInvokable { public static class WordCountDebugSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000, ""); private PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000, "");
......
...@@ -19,6 +19,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; ...@@ -19,6 +19,7 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable { public class WordCountSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
......
...@@ -23,6 +23,7 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; ...@@ -23,6 +23,7 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSource extends UserSourceInvokable { public class WordCountSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null; private BufferedReader br = null;
private String line = new String(); private String line = new String();
......
...@@ -22,6 +22,7 @@ import eu.stratosphere.streaming.util.PerformanceCounter; ...@@ -22,6 +22,7 @@ import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer; import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSplitter extends UserTaskInvokable { public class WordCountSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {}; private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>()); private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
......
...@@ -87,7 +87,7 @@ public class RMQSource extends UserSourceInvokable { ...@@ -87,7 +87,7 @@ public class RMQSource extends UserSourceInvokable {
message = new String(delivery.getBody()); message = new String(delivery.getBody());
if (message.equals("quit")) { if (message.equals("q")) {
break; break;
} }
......
...@@ -27,7 +27,6 @@ import eu.stratosphere.streaming.util.LogUtils; ...@@ -27,7 +27,6 @@ import eu.stratosphere.streaming.util.LogUtils;
public class RMQTopology { public class RMQTopology {
public static class Sink extends UserSinkInvokable { public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -41,7 +40,7 @@ public class RMQTopology { ...@@ -41,7 +40,7 @@ public class RMQTopology {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE); JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1); graphBuilder.setSource("Source", new RMQSource("localhost", "hello"), 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1); graphBuilder.setSink("Sink", new Sink(), 1, 1);
graphBuilder.shuffleConnect("Source", "Sink"); graphBuilder.shuffleConnect("Source", "Sink");
......
...@@ -148,4 +148,8 @@ public class PerformanceTracker { ...@@ -148,4 +148,8 @@ public class PerformanceTracker {
} }
public void setFname(String fname) {
this.fname = fname;
}
} }
...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -27,10 +26,6 @@ import org.junit.Test; ...@@ -27,10 +26,6 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册