提交 dac02696 编写于 作者: G Gao Yun 提交者: Stephan Ewen

[FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy

上级 dd6508f9
......@@ -22,7 +22,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import javax.annotation.Nonnull;
......@@ -48,7 +47,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Override
public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
@Nonnull Collection<? extends SlotInfo> availableSlots,
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile) {
Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
......@@ -67,12 +66,12 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Nonnull
private Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
@Nonnull Collection<? extends SlotInfo> availableSlots,
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull ResourceProfile resourceProfile) {
for (SlotInfo candidate : availableSlots) {
if (candidate.getResourceProfile().isMatching(resourceProfile)) {
return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));
for (SlotInfoAndResources candidate : availableSlots) {
if (candidate.getRemainingResources().isMatching(resourceProfile)) {
return Optional.of(SlotInfoAndLocality.of(candidate.getSlotInfo(), Locality.UNCONSTRAINED));
}
}
return Optional.empty();
......@@ -80,7 +79,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Nonnull
private Optional<SlotInfoAndLocality> selectWitLocationPreference(
@Nonnull Collection<? extends SlotInfo> availableSlots,
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull Collection<TaskManagerLocation> locationPreferences,
@Nonnull ResourceProfile resourceProfile) {
......@@ -93,21 +92,21 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
}
SlotInfo bestCandidate = null;
SlotInfoAndResources bestCandidate = null;
Locality bestCandidateLocality = Locality.UNKNOWN;
int bestCandidateScore = Integer.MIN_VALUE;
for (SlotInfo candidate : availableSlots) {
for (SlotInfoAndResources candidate : availableSlots) {
if (candidate.getResourceProfile().isMatching(resourceProfile)) {
if (candidate.getRemainingResources().isMatching(resourceProfile)) {
// this gets candidate is local-weigh
Integer localWeigh = preferredResourceIDs.getOrDefault(
candidate.getTaskManagerLocation().getResourceID(), 0);
candidate.getSlotInfo().getTaskManagerLocation().getResourceID(), 0);
// this gets candidate is host-local-weigh
Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(
candidate.getTaskManagerLocation().getFQDNHostname(), 0);
candidate.getSlotInfo().getTaskManagerLocation().getFQDNHostname(), 0);
int candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh, hostLocalWeigh);
if (candidateScore > bestCandidateScore) {
......@@ -122,7 +121,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
// at the end of the iteration, we return the candidate with best possible locality or null.
return bestCandidate != null ?
Optional.of(SlotInfoAndLocality.of(bestCandidate, bestCandidateLocality)) :
Optional.of(SlotInfoAndLocality.of(bestCandidate.getSlotInfo(), bestCandidateLocality)) :
Optional.empty();
}
}
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import javax.annotation.Nonnull;
......@@ -41,39 +40,39 @@ public enum PreviousAllocationSlotSelectionStrategy implements SlotSelectionStra
@Override
public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
@Nonnull Collection<? extends SlotInfo> availableSlots,
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile) {
Collection<AllocationID> priorAllocations = slotProfile.getPreferredAllocations();
// First, if there was a prior allocation try to schedule to the same/old slot
if (!priorAllocations.isEmpty()) {
for (SlotInfo availableSlot : availableSlots) {
if (priorAllocations.contains(availableSlot.getAllocationId())) {
for (SlotInfoAndResources availableSlot : availableSlots) {
if (priorAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
return Optional.of(
SlotInfoAndLocality.of(availableSlot, Locality.LOCAL));
SlotInfoAndLocality.of(availableSlot.getSlotInfo(), Locality.LOCAL));
}
}
}
// Second, select based on location preference, excluding blacklisted allocations
Set<AllocationID> blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations();
Collection<? extends SlotInfo> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
Collection<SlotInfoAndResources> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
return LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile);
}
@Nonnull
private Collection<SlotInfo> computeWithoutBlacklistedSlots(
@Nonnull Collection<? extends SlotInfo> availableSlots,
private Collection<SlotInfoAndResources> computeWithoutBlacklistedSlots(
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull Set<AllocationID> blacklistedAllocations) {
if (blacklistedAllocations.isEmpty()) {
return Collections.unmodifiableCollection(availableSlots);
}
ArrayList<SlotInfo> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
for (SlotInfo availableSlot : availableSlots) {
if (!blacklistedAllocations.contains(availableSlot.getAllocationId())) {
ArrayList<SlotInfoAndResources> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
for (SlotInfoAndResources availableSlot : availableSlots) {
if (!blacklistedAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
availableAndAllowedSlots.add(availableSlot);
}
}
......
......@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;
......@@ -51,6 +50,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
/**
* Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we
......@@ -293,7 +293,11 @@ public class SchedulerImpl implements Scheduler {
@Nonnull SlotRequestId slotRequestId,
@Nonnull SlotProfile slotProfile) {
Collection<SlotInfo> slotInfoList = slotPool.getAvailableSlotsInformation();
Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
slotPool.getAvailableSlotsInformation()
.stream()
.map(SlotSelectionStrategy.SlotInfoAndResources::new)
.collect(Collectors.toList());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
......@@ -479,7 +483,8 @@ public class SchedulerImpl implements Scheduler {
boolean allowQueuedScheduling,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
Collection<SlotInfo> resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId);
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
slotSharingManager.listResolvedRootSlotInfo(groupId);
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotInfo;
......@@ -36,14 +37,44 @@ public interface SlotSelectionStrategy {
* Selects the best {@link SlotInfo} w.r.t. a certain selection criterion from the provided list of available slots
* and considering the given {@link SlotProfile} that describes the requirements.
*
* @param availableSlots a list of the available slots to select from.
* @param availableSlots a list of the available slots together with their remaining resources to select from.
* @param slotProfile a slot profile, describing requirements for the slot selection.
* @return the selected slot info with the corresponding locality hint.
*/
Optional<SlotInfoAndLocality> selectBestSlotForProfile(
@Nonnull Collection<? extends SlotInfo> availableSlots,
@Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile);
/**
* This class is a value type that combines a {@link SlotInfo} with its remaining {@link ResourceProfile}.
*/
final class SlotInfoAndResources {
@Nonnull
private final SlotInfo slotInfo;
@Nonnull
private final ResourceProfile remainingResources;
public SlotInfoAndResources(@Nonnull SlotInfo slotInfo) {
this(slotInfo, slotInfo.getResourceProfile());
}
public SlotInfoAndResources(@Nonnull SlotInfo slotInfo, @Nonnull ResourceProfile remainingResources) {
this.slotInfo = slotInfo;
this.remainingResources = remainingResources;
}
@Nonnull
public SlotInfo getSlotInfo() {
return slotInfo;
}
@Nonnull
public ResourceProfile getRemainingResources() {
return remainingResources;
}
}
/**
* This class is a value type that combines a {@link SlotInfo} with a {@link Locality} hint.
......
......@@ -180,14 +180,18 @@ public class SlotSharingManager {
}
@Nonnull
public Collection<SlotInfo> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
public Collection<SlotSelectionStrategy.SlotInfoAndResources> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
return resolvedRootSlots
.values()
.stream()
.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
.map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) multiTaskSlot.getSlotContextFuture().join())
.collect(Collectors.toList());
.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
.map((MultiTaskSlot multiTaskSlot) -> {
SlotInfo slotInfo = multiTaskSlot.getSlotContextFuture().join();
return new SlotSelectionStrategy.SlotInfoAndResources(
slotInfo,
slotInfo.getResourceProfile().subtract(multiTaskSlot.getReservedResources()));
}).collect(Collectors.toList());
}
@Nullable
......
......@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Collectors;
public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyTestBase {
......@@ -61,7 +62,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
SlotProfile slotProfile = new SlotProfile(ResourceProfile.UNKNOWN, Collections.emptyList(), Collections.emptySet());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
Assert.assertTrue(
candidates.stream()
.map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
.collect(Collectors.toList())
.contains(match.get().getSlotInfo()));
}
@Test
......@@ -70,7 +75,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tmlX), Collections.emptySet());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
Assert.assertTrue(
candidates.stream()
.map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
.collect(Collectors.toList())
.contains(match.get().getSlotInfo()));
}
@Test
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.clusterframework.types;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
......@@ -56,7 +55,7 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
protected final SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile);
protected final SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile);
protected final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates());
protected final Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = Collections.unmodifiableSet(createCandidates());
protected final SlotSelectionStrategy selectionStrategy;
......@@ -64,12 +63,12 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
this.selectionStrategy = slotSelectionStrategy;
}
private Set<SlotContext> createCandidates() {
Set<SlotContext> candidates = new HashSet<>(4);
candidates.add(ssc1);
candidates.add(ssc2);
candidates.add(ssc3);
candidates.add(ssc4);
private Set<SlotSelectionStrategy.SlotInfoAndResources> createCandidates() {
Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = new HashSet<>(4);
candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc1));
candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc2));
candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc3));
candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc4));
return candidates;
}
......
......@@ -331,6 +331,95 @@ public class SlotPoolSlotSharingTest extends TestLogger {
assertEquals(allocationId2, logicalSlot3.getAllocationId());
}
/**
* Tests that when matching from the allocated slot, the remaining resources of the slot
* will be used instead of the total resource.
*/
@Test
public void testSlotSharingRespectsRemainingResource() throws Exception {
final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
final ResourceProfile largeRequestResource = new ResourceProfile(2.0, 200);
final ResourceProfile smallRequestResource = new ResourceProfile(1.0, 100);
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
final JobVertexID jobVertexId1 = new JobVertexID();
final JobVertexID jobVertexId2 = new JobVertexID();
final JobVertexID jobVertexId3 = new JobVertexID();
final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
slotSharingGroupId,
null),
true,
SlotProfile.noLocality(largeRequestResource),
TestingUtils.infiniteTime());
final AllocationID allocationId1 = allocationIds.take();
// This should fulfill the first request.
boolean offerFuture = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId1,
0,
allocatedSlotRp));
assertTrue(offerFuture);
assertTrue(logicalSlotFuture1.isDone());
assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
// The second request should not share the same slot with the first request since it is large.
CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId2,
slotSharingGroupId,
null),
true,
SlotProfile.noLocality(largeRequestResource),
TestingUtils.infiniteTime());
assertFalse(logicalSlotFuture2.isDone());
// The third request should be able to share the same slot with the first request since it is small.
CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId3,
slotSharingGroupId,
null),
true,
SlotProfile.noLocality(smallRequestResource),
TestingUtils.infiniteTime());
assertTrue(logicalSlotFuture3.isDone());
assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
// The second request should be finally fulfilled by a new slot.
final AllocationID allocationId2 = allocationIds.take();
// This should fulfill the first two requests.
offerFuture = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId2,
0,
allocatedSlotRp));
assertTrue(offerFuture);
assertTrue(logicalSlotFuture2.isDone());
assertEquals(allocationId2, logicalSlotFuture2.get().getAllocationId());
}
@Test
public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
......
......@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -439,12 +438,12 @@ public class SlotSharingManagerTest extends TestLogger {
AbstractID groupId = new AbstractID();
Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
Assert.assertEquals(1, slotInfos.size());
SlotInfo slotInfo = slotInfos.iterator().next();
SlotSelectionStrategy.SlotInfoAndResources slotInfoAndRemainingResource = slotInfos.iterator().next();
SlotSharingManager.MultiTaskSlot resolvedMultiTaskSlot =
slotSharingManager.getResolvedRootSlot(slotInfo);
slotSharingManager.getResolvedRootSlot(slotInfoAndRemainingResource.getSlotInfo());
SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, SlotProfile.noRequirements()).get();
......@@ -501,7 +500,7 @@ public class SlotSharingManagerTest extends TestLogger {
SlotProfile slotProfile = SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singleton(taskManagerLocation));
Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, slotProfile).get();
SlotSharingManager.MultiTaskSlot resolvedRootSlot = slotSharingManager.getResolvedRootSlot(slotInfoAndLocality.getSlotInfo());
......@@ -619,6 +618,51 @@ public class SlotSharingManagerTest extends TestLogger {
assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources());
}
@Test
public void testGetResolvedSlotWithResourceConfigured() {
ResourceProfile rp1 = new ResourceProfile(1.0, 100);
ResourceProfile rp2 = new ResourceProfile(2.0, 200);
ResourceProfile allocatedSlotRp = new ResourceProfile(5.0, 500);
final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
SlotSharingManager slotSharingManager = new SlotSharingManager(
SLOT_SHARING_GROUP_ID,
allocatedSlotActions,
SLOT_OWNER);
SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot(
new SlotRequestId(),
CompletableFuture.completedFuture(
new SimpleSlotContext(
new AllocationID(),
new LocalTaskManagerLocation(),
0,
new SimpleAckingTaskManagerGateway(),
allocatedSlotRp)),
new SlotRequestId());
rootSlot.allocateSingleTaskSlot(
new SlotRequestId(),
rp1,
new SlotSharingGroupId(),
Locality.LOCAL);
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRoots =
slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
assertEquals(1, resolvedRoots.size());
assertEquals(allocatedSlotRp.subtract(rp1), resolvedRoots.iterator().next().getRemainingResources());
rootSlot.allocateSingleTaskSlot(
new SlotRequestId(),
rp2,
new SlotSharingGroupId(),
Locality.LOCAL);
resolvedRoots = slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
assertEquals(1, resolvedRoots.size());
assertEquals(allocatedSlotRp.subtract(rp1).subtract(rp2), resolvedRoots.iterator().next().getRemainingResources());
}
@Test
public void testHashEnoughResourceOfMultiTaskSlot() {
ResourceProfile rp1 = new ResourceProfile(1.0, 100);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册