提交 33e48032 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] wordcount cluster testing

上级 3d6462b5
...@@ -19,7 +19,6 @@ import org.apache.log4j.BasicConfigurator; ...@@ -19,7 +19,6 @@ import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster; import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobClient;
...@@ -46,7 +45,7 @@ public class WordCountCluster { ...@@ -46,7 +45,7 @@ public class WordCountCluster {
private static JobGraph getJobGraph() throws Exception { private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class); graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2); graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2); graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.setSink("WordCountSink", WordCountSink.class);
...@@ -69,17 +68,12 @@ public class WordCountCluster { ...@@ -69,17 +68,12 @@ public class WordCountCluster {
Configuration configuration = jG.getJobConfiguration(); Configuration configuration = jG.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "hadoop02.ilab.sztaki.hu"); configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "hadoop02.ilab.sztaki.hu");
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
configuration.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 6122);
// configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 7501);
JobClient client= new JobClient(jG, configuration); JobClient client= new JobClient(jG, configuration);
//JobClient client = exec.getJobClient(jG);
ClassLoader userClassLoader;
client.submitJobAndWait(); client.submitJobAndWait();
} catch (Exception e) { } catch (Exception e) {
......
...@@ -15,12 +15,15 @@ ...@@ -15,12 +15,15 @@
package eu.stratosphere.streaming.test.wordcount; package eu.stratosphere.streaming.test.wordcount;
import java.net.InetSocketAddress;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.BasicConfigurator; import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import eu.stratosphere.client.minicluster.NepheleMiniCluster; import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobClient;
...@@ -37,6 +40,7 @@ public class WordCountLocal { ...@@ -37,6 +40,7 @@ public class WordCountLocal {
private NepheleMiniCluster executor; private NepheleMiniCluster executor;
public WordCountLocal() { public WordCountLocal() {
this(new Configuration()); this(new Configuration());
} }
...@@ -126,20 +130,24 @@ public class WordCountLocal { ...@@ -126,20 +130,24 @@ public class WordCountLocal {
try { try {
JobGraph jG = getJobGraph(); JobGraph jG = getJobGraph();
int jobManagerRpcPort = 6498;
Configuration configuration = jG.getJobConfiguration(); Configuration configuration = jG.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); Client client= new Client(new InetSocketAddress("localhost", 6498), configuration);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
JobClient client= new JobClient(jG, configuration); // int jobManagerRpcPort = 6498;
//
// configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
// configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
//
//
// JobClient client= new JobClient(jG, configuration);
exec.start(); exec.start();
//JobClient client = exec.getJobClient(jG); //JobClient client = exec.getJobClient(jG);
client.run(null, jG, true);
client.submitJobAndWait(); // client.submitJobAndWait();
exec.stop(); exec.stop();
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册