[FLINK-7655] [flip6] Set fencing token to null if not leader

This commit changes the fencing behaviour such that a component which is not the
leader will set its fencing token to null. This distinction allows to throw different
exceptions depending on whether it is a token mismatch or whether the receiver has
no fencing token set (== not being the leader).

This closes #4689.
上级 a86b6468
......@@ -94,7 +94,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
Optional<String> restAddress) throws Exception {
super(rpcService, endpointId, DispatcherId.generate());
super(rpcService, endpointId);
this.configuration = Preconditions.checkNotNull(configuration);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
......@@ -399,7 +399,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e);
}
setFencingToken(DispatcherId.generate());
// clear the fencing token indicating that we don't have the leadership right now
setFencingToken(null);
});
}
......
......@@ -212,7 +212,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID);
super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
selfGateway = getSelfGateway(JobMasterGateway.class);
......@@ -735,7 +735,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return Acknowledge.get();
}
if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) {
if (getFencingToken() != null) {
log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), newJobMasterId);
// first we have to suspend the current execution
......@@ -791,13 +791,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private Acknowledge suspendExecution(final Throwable cause) {
validateRunsInMainThread();
if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, getFencingToken())) {
if (getFencingToken() == null) {
log.debug("Job has already been suspended or shutdown.");
return Acknowledge.get();
}
// not leader anymore --> set the JobMasterId to the initial id
setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID);
// not leader anymore --> set the JobMasterId to null
setFencingToken(null);
try {
resourceManagerLeaderRetriever.stop();
......
......@@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID {
private static final long serialVersionUID = -933276753644003754L;
public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L);
public JobMasterId(byte[] bytes) {
super(bytes);
}
......
......@@ -139,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate());
super(rpcService, resourceManagerEndpointId);
this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
......@@ -772,13 +772,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public void revokeLeadership() {
runAsyncWithoutFencing(
() -> {
final ResourceManagerId newResourceManagerId = ResourceManagerId.generate();
log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId);
log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
clearState();
setFencingToken(newResourceManagerId);
setFencingToken(null);
slotManager.suspend();
});
......
......@@ -19,7 +19,8 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.UUID;
......@@ -39,25 +40,26 @@ public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {
private volatile F fencingToken;
private volatile MainThreadExecutor fencedMainThreadExecutor;
protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken) {
protected FencedRpcEndpoint(RpcService rpcService, String endpointId) {
super(rpcService, endpointId);
this.fencingToken = Preconditions.checkNotNull(initialFencingToken);
// no fencing token == no leadership
this.fencingToken = null;
this.fencedMainThreadExecutor = new MainThreadExecutor(
getRpcService().fenceRpcServer(
rpcServer,
initialFencingToken));
null));
}
protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) {
this(rpcService, UUID.randomUUID().toString(), initialFencingToken);
protected FencedRpcEndpoint(RpcService rpcService) {
this(rpcService, UUID.randomUUID().toString());
}
public F getFencingToken() {
return fencingToken;
}
protected void setFencingToken(F newFencingToken) {
protected void setFencingToken(@Nullable F newFencingToken) {
// this method should only be called from within the main thread
validateRunsInMainThread();
......
......@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
......@@ -45,23 +45,36 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
@Override
protected void handleMessage(Object message) {
if (message instanceof FencedMessage) {
@SuppressWarnings("unchecked")
FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);
F fencingToken = fencedMessage.getFencingToken();
final F expectedFencingToken = rpcEndpoint.getFencingToken();
if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) {
super.handleMessage(fencedMessage.getPayload());
} else {
if (expectedFencingToken == null) {
if (log.isDebugEnabled()) {
log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message);
}
sendErrorIfSender(
new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message +
" because the fencing token " + fencingToken + " did not match the expected fencing token " +
rpcEndpoint.getFencingToken() + '.'));
new FencingTokenException(
"Fencing token not set: Ignoring message " + message + " because the fencing token is null."));
} else {
@SuppressWarnings("unchecked")
FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);
F fencingToken = fencedMessage.getFencingToken();
if (Objects.equals(expectedFencingToken, fencingToken)) {
super.handleMessage(fencedMessage.getPayload());
} else {
if (log.isDebugEnabled()) {
log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
"not match the expected fencing token {}.", message, fencingToken, expectedFencingToken);
}
sendErrorIfSender(
new FencingTokenException("Fencing token mismatch: Ignoring message " + message +
" because the fencing token " + fencingToken + " did not match the expected fencing token " +
expectedFencingToken + '.'));
}
}
} else if (message instanceof UnfencedMessage) {
super.handleMessage(((UnfencedMessage<?>) message).getPayload());
......
......@@ -25,18 +25,18 @@ import org.apache.flink.runtime.rpc.exceptions.RpcException;
* Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do
* not match.
*/
public class FencingTokenMismatchException extends RpcException {
public class FencingTokenException extends RpcException {
private static final long serialVersionUID = -500634972988881467L;
public FencingTokenMismatchException(String message) {
public FencingTokenException(String message) {
super(message);
}
public FencingTokenMismatchException(String message, Throwable cause) {
public FencingTokenException(String message, Throwable cause) {
super(message, cause);
}
public FencingTokenMismatchException(Throwable cause) {
public FencingTokenException(Throwable cause) {
super(cause);
}
}
......@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
......@@ -34,8 +36,8 @@ public class LocalFencedMessage<F extends Serializable, P> implements FencedMess
private final F fencingToken;
private final P payload;
public LocalFencedMessage(F fencingToken, P payload) {
this.fencingToken = Preconditions.checkNotNull(fencingToken);
public LocalFencedMessage(@Nullable F fencingToken, P payload) {
this.fencingToken = fencingToken;
this.payload = Preconditions.checkNotNull(payload);
}
......
......@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
......@@ -35,8 +37,8 @@ public class RemoteFencedMessage<F extends Serializable, P extends Serializable>
private final F fencingToken;
private final P payload;
public RemoteFencedMessage(F fencingToken, P payload) {
this.fencingToken = Preconditions.checkNotNull(fencingToken);
public RemoteFencedMessage(@Nullable F fencingToken, P payload) {
this.fencingToken = fencingToken;
this.payload = Preconditions.checkNotNull(payload);
}
......
......@@ -108,7 +108,7 @@ public class ResourceManagerHATest extends TestLogger {
try {
resourceManager.start();
Assert.assertNotNull(resourceManager.getFencingToken());
Assert.assertNull(resourceManager.getFencingToken());
final UUID leaderId = UUID.randomUUID();
leaderElectionService.isLeader(leaderId);
// after grant leadership, resourceManager's leaderId has value
......
......@@ -38,7 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
......@@ -47,6 +47,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -81,10 +82,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
JobMasterId jobMasterId = JobMasterId.generate();
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
jobMasterId,
......@@ -127,7 +132,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
fail("Should fail because we are using the wrong fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
if (testingFatalErrorHandler.hasExceptionOccurred()) {
......@@ -151,6 +156,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
JobMasterId differentJobMasterId = JobMasterId.generate();
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
......@@ -182,6 +190,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
// test throw exception when receive a registration from job master which takes invalid address
String invalidAddress = "/jobMasterAddress2";
CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager(
......@@ -219,6 +230,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
JobID unknownJobIDToHAServices = new JobID();
// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
// this should fail because we try to register a job leader listener for an unknown job id
CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager(
new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),
......
......@@ -30,7 +30,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
......@@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
......
......@@ -25,7 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
......@@ -231,7 +231,7 @@ public class AsyncCallsTest extends TestLogger {
fail("The async call operation should fail due to the changed fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
}
......@@ -346,10 +346,19 @@ public class AsyncCallsTest extends TestLogger {
UUID initialFencingToken,
OneShotLatch enteringSetNewFencingToken,
OneShotLatch triggerSetNewFencingToken) {
super(rpcService, initialFencingToken);
super(rpcService);
this.enteringSetNewFencingToken = enteringSetNewFencingToken;
this.triggerSetNewFencingToken = triggerSetNewFencingToken;
// make it look as if we are running in the main thread
currentMainThread.set(Thread.currentThread());
try {
setFencingToken(initialFencingToken);
} finally {
currentMainThread.set(null);
}
}
@Override
......
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
......@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -67,17 +68,15 @@ public class FencedRpcEndpointTest extends TestLogger {
*/
@Test
public void testFencingTokenSetting() throws Exception {
final UUID initialFencingToken = UUID.randomUUID();
final String value = "foobar";
FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value);
FencedTestingGateway fencedTestingGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value);
FencedTestingGateway fencedGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
try {
fencedTestingEndpoint.start();
assertEquals(initialFencingToken, fencedGateway.getFencingToken());
assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken());
assertNull(fencedGateway.getFencingToken());
assertNull(fencedTestingEndpoint.getFencingToken());
final UUID newFencingToken = UUID.randomUUID();
......@@ -88,9 +87,9 @@ public class FencedRpcEndpointTest extends TestLogger {
// expected to fail
}
assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken());
assertNull(fencedTestingEndpoint.getFencingToken());
CompletableFuture<Acknowledge> setFencingFuture = fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout);
CompletableFuture<Acknowledge> setFencingFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
// wait for the completion of the set fencing token operation
setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
......@@ -109,15 +108,15 @@ public class FencedRpcEndpointTest extends TestLogger {
*/
@Test
public void testFencing() throws Exception {
final UUID initialFencingToken = UUID.randomUUID();
final UUID fencingToken = UUID.randomUUID();
final UUID wrongFencingToken = UUID.randomUUID();
final String value = "barfoo";
FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value);
FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, fencingToken);
try {
fencedTestingEndpoint.start();
final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, FencedTestingGateway.class)
final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), fencingToken, FencedTestingGateway.class)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final FencedTestingGateway wronglyFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, FencedTestingGateway.class)
.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
......@@ -128,12 +127,12 @@ public class FencedRpcEndpointTest extends TestLogger {
wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
fail("This should fail since we have the wrong fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
final UUID newFencingToken = UUID.randomUUID();
CompletableFuture<Acknowledge> newFencingTokenFuture = properFencedGateway.rpcSetFencingToken(newFencingToken, timeout);
CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
// wait for the new fencing token to be set
newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
......@@ -144,7 +143,7 @@ public class FencedRpcEndpointTest extends TestLogger {
fail("This should fail since we have the wrong fencing token by now.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
} finally {
......@@ -163,7 +162,7 @@ public class FencedRpcEndpointTest extends TestLogger {
final UUID newFencingToken = UUID.randomUUID();
final String value = "foobar";
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value);
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken);
try {
fencedTestingEndpoint.start();
......@@ -178,7 +177,7 @@ public class FencedRpcEndpointTest extends TestLogger {
assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
assertEquals(value, remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout);
CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
// wait for the new fencing token to be set
newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
......@@ -192,7 +191,7 @@ public class FencedRpcEndpointTest extends TestLogger {
remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
fail("This should have failed because we don't have the right fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
} finally {
fencedTestingEndpoint.shutDown();
......@@ -208,7 +207,7 @@ public class FencedRpcEndpointTest extends TestLogger {
final Time shortTimeout = Time.milliseconds(100L);
final UUID initialFencingToken = UUID.randomUUID();
final String value = "foobar";
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value);
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken);
try {
fencedTestingEndpoint.start();
......@@ -221,7 +220,7 @@ public class FencedRpcEndpointTest extends TestLogger {
// therefore, we know that the change fencing token call is executed after the trigger MainThreadExecutor
// computation
final UUID newFencingToken = UUID.randomUUID();
CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout);
CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
......@@ -253,7 +252,7 @@ public class FencedRpcEndpointTest extends TestLogger {
final UUID initialFencingToken = UUID.randomUUID();
final String value = "foobar";
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value);
final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken);
try {
fencedTestingEndpoint.start();
......@@ -283,8 +282,6 @@ public class FencedRpcEndpointTest extends TestLogger {
public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
CompletableFuture<String> foobar(@RpcTimeout Time timeout);
CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, @RpcTimeout Time timeout);
CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time timeout);
CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time timeout);
......@@ -296,12 +293,25 @@ public class FencedRpcEndpointTest extends TestLogger {
private final String value;
protected FencedTestingEndpoint(RpcService rpcService, UUID initialFencingToken, String value) {
super(rpcService, initialFencingToken);
protected FencedTestingEndpoint(RpcService rpcService, String value) {
this(rpcService, value, null);
}
protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) {
super(rpcService);
computationLatch = new OneShotLatch();
this.value = value;
// make sure that it looks as if we are running in the main thread
currentMainThread.set(Thread.currentThread());
try {
setFencingToken(initialFencingToken);
} finally {
currentMainThread.set(null);
}
}
@Override
......@@ -309,13 +319,6 @@ public class FencedRpcEndpointTest extends TestLogger {
return CompletableFuture.completedFuture(value);
}
@Override
public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, Time timeout) {
setFencingToken(fencingToken);
return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time timeout) {
return CompletableFuture.supplyAsync(
......@@ -340,5 +343,15 @@ public class FencedRpcEndpointTest extends TestLogger {
return CompletableFuture.completedFuture(Acknowledge.get());
}
public CompletableFuture<Acknowledge> setFencingTokenInMainThread(UUID fencingToken, Time timeout) {
return callAsyncWithoutFencing(
() -> {
setFencingToken(fencingToken);
return Acknowledge.get();
},
timeout);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册