提交 348e85bb 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] cluster testing

上级 ffdbb030
......@@ -61,7 +61,7 @@ public class AckEventListener implements EventListener {
Long nt = System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId());
log.debug("Ack processed " + ackEvent.getRecordId() + " with exec. time (ns): " + (System.nanoTime() - nt));
log.debug("ACK PROCESSED: " + ackEvent.getRecordId() + " exec. time (ns): " + (System.nanoTime() - nt));
}
}
}
......@@ -57,7 +57,7 @@ public class FailEventListener implements EventListener {
String failCID = recordId.split("-", 2)[0];
if (failCID.equals(taskInstanceID)) {
recordBuffer.failRecord(recordId);
log.warn("Fail recieved: " + recordId + " conaining: " + recordBuffer.getRecordBuffer());
log.warn("FAIL RECIEVED: " + recordId + " containing: " + recordBuffer.getRecordBuffer());
//log.trace(recordBuffer.getRecordBuffer());
}
......
......@@ -224,9 +224,9 @@ public class FaultToleranceBuffer {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
log.warn("Re-emitted: " + record.getId());
log.warn("RE-EMITTED: " + record.getId());
} catch (Exception e) {
log.error("Re-emit failed, avoiding record: " + record.getId());
log.error("RE-EMIT FAILED, avoiding record: " + record.getId());
}
}
......
......@@ -68,6 +68,7 @@ public class JobGraphBuilder {
numberOfInstances = new HashMap<String, Integer>();
numberOfOutputChannels = new HashMap<String, Integer>();
log.debug("JobGraph created");
}
/**
......
......@@ -49,10 +49,10 @@ public abstract class StreamInvokableComponent {
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
log.debug("Record " + record.getId()+" emitted from "+name);
log.debug("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
log.warn("Emit error: " + e.getMessage()+ "at "+name);
log.warn("EMIT ERROR: " + e.getMessage() + " -- " + name);
emittedRecords.failRecord(record.getId());
}
}
......
......@@ -72,10 +72,10 @@ public class StreamSink extends AbstractOutputTask {
try {
userFunction.invoke(rec);
streamSinkHelper.threadSafePublish(new AckEvent(id), input);
log.debug("Ack sent from " + name + ": " + id);
log.debug("ACK: " + id + " -- " + name);
} catch (Exception e) {
streamSinkHelper.threadSafePublish(new FailEvent(id), input);
log.warn("Invoking record " + id + " at " + name + " failed due to " + e.getMessage());
log.warn("INVOKE FAILED: " + id + " -- " + name + " -- due to " + e.getMessage());
}
}
......
......@@ -79,7 +79,6 @@ public class StreamTask extends AbstractTask {
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
}
// TODO: log userfunction name
@Override
public void invoke() throws Exception {
log.debug("Task " + name + " invoked with instance id " + taskInstanceID);
......@@ -96,10 +95,10 @@ public class StreamTask extends AbstractTask {
try {
userFunction.invoke(streamRecord);
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
log.debug("Ack sent from " + name + ": " + id);
log.debug("ACK: " + id + " -- " + name);
} catch (Exception e) {
streamTaskHelper.threadSafePublish(new FailEvent(id), input);
log.warn("Invoking record " + id + " failed due to " + e.getMessage());
log.warn("INVOKE FAILED: " + id + " -- " + name + " -- due to " + e.getMessage());
}
}
}
......
......@@ -113,7 +113,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return The StreamRecord object
*/
public StreamRecord setId(String channelID) {
uid.setValue(channelID + "-" + rnd.nextInt(1000));
uid.setValue(channelID + "-" + rnd.nextInt(1000000));
return this;
}
......
......@@ -48,12 +48,16 @@ public class CellInfo {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSourceInvokable.class);
// graphBuilder.setSource("infoSource2", InfoSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 2);
// graphBuilder.setSource("querySource2", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 3);
graphBuilder.setSink("sink", CellSinkInvokable.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask",0, IntValue.class);
// graphBuilder.fieldsConnect("infoSource2", "cellTask", 0, IntValue.class);
// graphBuilder.fieldsConnect("querySource2", "cellTask",0, IntValue.class);
graphBuilder.shuffleConnect("cellTask", "sink");
return graphBuilder.getJobGraph();
......
......@@ -23,11 +23,13 @@ public class CellSinkInvokable extends UserSinkInvokable {
int c=0;
@Override
public void invoke(StreamRecord record) throws Exception {
// c++;
// if((c % 10000)==0){
// System.out.println(c);
// }
c++;
}
@Override
public String getResult(){
return String.valueOf(c);
}
}
......@@ -33,7 +33,7 @@ public class InfoSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100000; i++) {
for (int i = 0; i < 500000; i++) {
cellId.setValue(_rand.nextInt(_cellNumber));
timeStamp.setValue(System.currentTimeMillis());
......
......@@ -30,12 +30,19 @@ public class WordCountCounter extends UserTaskInvokable {
private IntValue countValue = new IntValue();
private String word = new String();
private int count = 1;
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getRecord(0)[0];
word = wordValue.getValue();
i++;
if (i % 50000 == 0) {
time= System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "+(time-prevTime));
prevTime=time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......
......@@ -31,7 +31,7 @@ public class WordCountDummySource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10000000; i++) {
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
} else {
......
......@@ -15,40 +15,38 @@
package eu.stratosphere.streaming.test.wordcount;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class WordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
public class WordCountDummySource2 extends UserSourceInvokable {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
private StringValue lineValue = new StringValue("");
StreamRecord record = new StreamRecord(lineValue);
private long time;
private long prevTime = System.currentTimeMillis();
public WordCountDummySource2() {
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
}
return graphBuilder.getJobGraph();
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000000; i++) {
if (i % 50000 == 0) {
time= System.currentTimeMillis();
System.out.println("Source:\t" + i + "\t----Time: "+(time-prevTime));
prevTime=time;
}
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
} else {
lineValue.setValue("Gabor Gyula");
}
record.setRecord(lineValue);
emit(record);
}
}
}
......@@ -15,99 +15,22 @@
package eu.stratosphere.streaming.test.wordcount;
import java.io.File;
import java.net.InetSocketAddress;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.junit.Assert;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountLocal {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
protected final Configuration config;
private NepheleMiniCluster executor;
public WordCountLocal() {
this(new Configuration());
}
public WordCountLocal(Configuration config) {
verifyJvmOptions();
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap
+ "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m",
heap > MINIMUM_HEAP_SIZE_MB - 50);
}
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.start();
}
public void stopCluster() throws Exception {
try {
if (this.executor != null) {
this.executor.stop();
this.executor = null;
FileSystem.closeAll();
System.gc();
}
} finally {
}
}
public void runJob() throws Exception {
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph);
} catch (Exception e) {
System.err.println("here");
}
client.submitJobAndWait();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
......@@ -125,35 +48,46 @@ public class WordCountLocal {
public static void main(String[] args) {
NepheleMiniCluster exec = new NepheleMiniCluster();
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.ERROR);
try {
root.setLevel(Level.DEBUG);
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
jG.addJar(new Path(file.getAbsolutePath()));
if (args.length == 0) {
args = new String[] { "local" };
}
Configuration configuration = jG.getJobConfiguration();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
client.run(null, jG, true);
exec.start();
exec.stop();
client.run(null, jG, true);
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(null, jG, true);
}
exec.stop();
} catch (Exception e) {
System.out.println(e);
}
......
......@@ -30,27 +30,15 @@ import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountCluster {
public class WordCountRemote {
protected final Configuration config;
public WordCountCluster() {
this(new Configuration());
}
public WordCountCluster(Configuration config) {
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 4);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
......@@ -62,10 +50,20 @@ public class WordCountCluster {
}
public static void main(String[] args) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
File file = new File(
"target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph();
......@@ -78,6 +76,7 @@ public class WordCountCluster {
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(null, jG, true);
} catch (Exception e) {
System.out.println(e);
......
......@@ -21,9 +21,16 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
int nrOfRecords=0;
private long time;
private long prevTime = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
nrOfRecords++;
if (nrOfRecords % 50000 == 0) {
time= System.currentTimeMillis();
System.out.println("Sink:\t" + nrOfRecords + "\t----Time: "+(time-prevTime));
prevTime=time;
}
}
@Override
......
......@@ -24,18 +24,27 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue("");
private StreamRecord outputRecord = new StreamRecord(wordValue);
private int i = 0;
private StreamRecord outputRecord = new StreamRecord(wordValue);
private long time;
private long prevTime = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Splitter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime=time;
}
sentence = (StringValue) record.getRecord(0)[0];
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setRecord(wordValue);
emit(outputRecord);
//emit(new StreamRecord(wordValue));
// emit(new StreamRecord(wordValue));
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册