提交 9b34c8d2 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] wordcount cluster test

上级 4b03b9bb
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.wordcount;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountCluster {
protected final Configuration config;
public WordCountCluster() {
this(new Configuration());
}
public WordCountCluster(Configuration config) {
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
try {
JobGraph jG = getJobGraph();
int jobManagerRpcPort = 6123;
Configuration configuration = jG.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "hadoop02.ilab.sztaki.hu");
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
JobClient client= new JobClient(jG, configuration);
//JobClient client = exec.getJobClient(jG);
client.submitJobAndWait();
} catch (Exception e) {
System.out.println(e);
}
}
}
......@@ -21,40 +21,39 @@ import org.apache.log4j.Level;
import org.junit.Assert;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountLocal {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
protected final Configuration config;
private NepheleMiniCluster executor;
public WordCountLocal() {
this(new Configuration());
}
public WordCountLocal(Configuration config) {
verifyJvmOptions();
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
Assert.assertTrue("Insufficient java heap space " + heap
+ "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m",
heap > MINIMUM_HEAP_SIZE_MB - 50);
}
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
......@@ -79,35 +78,32 @@ public class WordCountLocal {
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph); }
catch(Exception e) {
client = this.executor.getJobClient(jobGraph);
} catch (Exception e) {
System.err.println("here");
}
client.submitJobAndWait();
}
catch(Exception e) {
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
}
protected JobGraph getJobGraph() throws Exception {
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......@@ -119,17 +115,36 @@ public class WordCountLocal {
return graphBuilder.getJobGraph();
}
public static void main(String[] args){
public static void main(String[] args) {
WordCountLocal wC = new WordCountLocal();
BasicConfigurator.configure();
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
wC.startCluster();
wC.runJob();
wC.stopCluster();
} catch (Exception e) {}
JobGraph jG = getJobGraph();
int jobManagerRpcPort = 6498;
Configuration configuration = jG.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
JobClient client= new JobClient(jG, configuration);
exec.start();
//JobClient client = exec.getJobClient(jG);
client.submitJobAndWait();
exec.stop();
} catch (Exception e) {
System.out.println(e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册