提交 ef76b59c 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] wordcount cluster settings updated

上级 98e087fd
...@@ -15,13 +15,18 @@ ...@@ -15,13 +15,18 @@
package eu.stratosphere.streaming.test.wordcount; package eu.stratosphere.streaming.test.wordcount;
import java.io.File;
import java.net.InetSocketAddress;
import org.apache.log4j.BasicConfigurator; import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster; import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.JobWithJars; import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
...@@ -30,10 +35,8 @@ import eu.stratosphere.util.LogUtils; ...@@ -30,10 +35,8 @@ import eu.stratosphere.util.LogUtils;
public class WordCountCluster { public class WordCountCluster {
protected final Configuration config; protected final Configuration config;
public WordCountCluster() { public WordCountCluster() {
this(new Configuration()); this(new Configuration());
} }
...@@ -60,29 +63,25 @@ public class WordCountCluster { ...@@ -60,29 +63,25 @@ public class WordCountCluster {
} }
public static void main(String[] args) { public static void main(String[] args) {
try { try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph(); JobGraph jG = getJobGraph();
int jobManagerRpcPort = 6123; jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration(); Configuration configuration = jG.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "hadoop02.ilab.sztaki.hu");
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort); Client client = new Client(new InetSocketAddress(
configuration.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 6122); "hadoop02.ilab.sztaki.hu", 6123), configuration);
// configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 7501);
client.run(null, jG, true);
JobClient client= new JobClient(jG, configuration);
ClassLoader userClassLoader;
client.submitJobAndWait();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); System.out.println(e);
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test.wordcount; package eu.stratosphere.streaming.test.wordcount;
import java.io.File;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
...@@ -22,10 +23,12 @@ import org.apache.log4j.BasicConfigurator; ...@@ -22,10 +23,12 @@ import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.client.minicluster.NepheleMiniCluster; import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client; import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
...@@ -40,7 +43,6 @@ public class WordCountLocal { ...@@ -40,7 +43,6 @@ public class WordCountLocal {
private NepheleMiniCluster executor; private NepheleMiniCluster executor;
public WordCountLocal() { public WordCountLocal() {
this(new Configuration()); this(new Configuration());
} }
...@@ -107,7 +109,7 @@ public class WordCountLocal { ...@@ -107,7 +109,7 @@ public class WordCountLocal {
private static JobGraph getJobGraph() throws Exception { private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class); graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2); graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2); graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.setSink("WordCountSink", WordCountSink.class);
...@@ -121,33 +123,25 @@ public class WordCountLocal { ...@@ -121,33 +123,25 @@ public class WordCountLocal {
} }
public static void main(String[] args) { public static void main(String[] args) {
WordCountLocal wC = new WordCountLocal();
BasicConfigurator.configure();
NepheleMiniCluster exec = new NepheleMiniCluster(); NepheleMiniCluster exec = new NepheleMiniCluster();
try { try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph(); JobGraph jG = getJobGraph();
jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration(); Configuration configuration = jG.getJobConfiguration();
Client client= new Client(new InetSocketAddress("localhost", 6498), configuration);
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
// int jobManagerRpcPort = 6498;
//
// configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
// configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
//
//
// JobClient client= new JobClient(jG, configuration);
exec.start(); exec.start();
//JobClient client = exec.getJobClient(jG);
// client.run(null, jG, true);
// client.submitJobAndWait(); client.run(null, jG, true);
exec.stop(); exec.stop();
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册