diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCluster.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCluster.java index 22d63a83fe877b84cc9fed4c35a1abb519288622..01d302d512309d6582e846e399f059a368baf3a3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCluster.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCluster.java @@ -19,7 +19,6 @@ import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.client.program.JobWithJars; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.client.JobClient; @@ -46,7 +45,7 @@ public class WordCountCluster { private static JobGraph getJobGraph() throws Exception { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("WordCountSource", WordCountDummySource.class); + graphBuilder.setSource("WordCountSource", WordCountSource.class); graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2); graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2); graphBuilder.setSink("WordCountSink", WordCountSink.class); @@ -69,17 +68,12 @@ public class WordCountCluster { Configuration configuration = jG.getJobConfiguration(); configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "hadoop02.ilab.sztaki.hu"); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort); - + configuration.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 6122); + // configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 7501); JobClient client= new JobClient(jG, configuration); - - //JobClient client = exec.getJobClient(jG); - ClassLoader userClassLoader; - - - client.submitJobAndWait(); } catch (Exception e) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java index 00d4927e85df717bc5866f274e55bbf55f7f7702..99396930ac5f2774676b9606372818e18a462dc1 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java @@ -15,12 +15,15 @@ package eu.stratosphere.streaming.test.wordcount; +import java.net.InetSocketAddress; + import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import org.junit.Assert; import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.client.JobClient; @@ -37,6 +40,7 @@ public class WordCountLocal { private NepheleMiniCluster executor; + public WordCountLocal() { this(new Configuration()); } @@ -126,20 +130,24 @@ public class WordCountLocal { try { JobGraph jG = getJobGraph(); - - int jobManagerRpcPort = 6498; Configuration configuration = jG.getJobConfiguration(); - configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort); + Client client= new Client(new InetSocketAddress("localhost", 6498), configuration); - JobClient client= new JobClient(jG, configuration); +// int jobManagerRpcPort = 6498; +// +// configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); +// configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort); +// +// +// JobClient client= new JobClient(jG, configuration); exec.start(); //JobClient client = exec.getJobClient(jG); + client.run(null, jG, true); - client.submitJobAndWait(); +// client.submitJobAndWait(); exec.stop(); } catch (Exception e) {