提交 24a9b1e5 编写于 作者: U Ufuk Celebi

Wait for slots instead of TaskManagers in test environment

Instead of waiting for the number of connected task managers, the local
mini cluster used for tests now waits for the total number of available
slots. Waiting for the number of connected task managers instead of the
available slots might result in races in rare situations.

In addition, rename task tracker (sic) to task manager in test classes.
上级 582cd030
......@@ -48,7 +48,7 @@ public class ClusterUtil {
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.setMemorySize(memorySize);
exec.setNumTaskTracker(numberOfTaskTrackers);
exec.setNumTaskManager(numberOfTaskTrackers);
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
}
......
......@@ -62,7 +62,7 @@ public class NepheleMiniCluster {
private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER;
private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
......@@ -157,9 +157,9 @@ public class NepheleMiniCluster {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
public int getNumTaskTracker() { return numTaskTracker; }
public int getNumTaskManager() { return numTaskManager; }
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
......@@ -202,7 +202,7 @@ public class NepheleMiniCluster {
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker);
defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager);
GlobalConfiguration.includeConfiguration(conf);
}
......@@ -226,7 +226,7 @@ public class NepheleMiniCluster {
// start the job manager
jobManager = new JobManager(ExecutionMode.LOCAL);
waitForJobManagerToBecomeReady(numTaskTracker);
waitForJobManagerToBecomeReady(taskManagerNumSlots * numTaskManager);
}
}
......@@ -243,8 +243,8 @@ public class NepheleMiniCluster {
// Network utility methods
// ------------------------------------------------------------------------
private void waitForJobManagerToBecomeReady(int numTaskManagers) throws InterruptedException {
while (jobManager.getNumberOfTaskManagers() < numTaskManagers) {
private void waitForJobManagerToBecomeReady(int numSlots) throws InterruptedException {
while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
Thread.sleep(50);
}
}
......
......@@ -55,7 +55,7 @@ public abstract class AbstractTestBase {
protected static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
protected static final int DEFAULT_NUM_TASK_TRACKER = 1;
protected static final int DEFAULT_NUM_TASK_MANAGER = 1;
protected final Configuration config;
......@@ -65,7 +65,7 @@ public abstract class AbstractTestBase {
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
protected int numTaskTracker = DEFAULT_NUM_TASK_TRACKER;
protected int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
public AbstractTestBase(Configuration config) {
verifyJvmOptions();
......@@ -89,7 +89,7 @@ public abstract class AbstractTestBase {
this.executor.setLazyMemoryAllocation(true);
this.executor.setMemorySize(TASK_MANAGER_MEMORY_SIZE);
this.executor.setTaskManagerNumSlots(taskManagerNumSlots);
this.executor.setNumTaskTracker(this.numTaskTracker);
this.executor.setNumTaskManager(this.numTaskManager);
this.executor.start();
}
......@@ -115,9 +115,9 @@ public abstract class AbstractTestBase {
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getNumTaskTracker() { return numTaskTracker; }
public int getNumTaskManager() { return numTaskManager; }
public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
// --------------------------------------------------------------------------------------------
......
......@@ -29,7 +29,7 @@ public class WordCountITCase extends JavaProgramTestBase {
public WordCountITCase(){
setDegreeOfParallelism(4);
setNumTaskTracker(2);
setNumTaskManager(2);
setTaskManagerNumSlots(2);
}
......
......@@ -59,7 +59,7 @@ public class PackagedProgramEndToEndITCase {
String jarPath = "target/maven-test-jar.jar";
// run KMeans
cluster.setNumTaskTracker(2);
cluster.setNumTaskManager(2);
cluster.setTaskManagerNumSlots(2);
cluster.start();
......
......@@ -82,7 +82,7 @@ public class NetworkStackThroughput {
throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
}
setNumTaskTracker(parallelism / numSlots);
setNumTaskManager(parallelism / numSlots);
setTaskManagerNumSlots(numSlots);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册