[hotfix][rpc] Add proper error reporting to AkkaRpcActor#handleControlMessage

上级 73958cc5
......@@ -119,7 +119,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(
new AkkaRpcException(
String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId())));
this.state = StoppedState.INSTANCE;
this.state = StoppedState.STOPPED;
}
@Override
......@@ -164,18 +164,23 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
}
private void handleControlMessage(ControlMessages controlMessage) {
switch (controlMessage) {
case START:
state = state.start(this);
break;
case STOP:
state = state.stop();
break;
case TERMINATE:
state.terminate(this);
break;
default:
handleUnknownControlMessage(controlMessage);
try {
switch (controlMessage) {
case START:
state = state.start(this);
break;
case STOP:
state = state.stop();
break;
case TERMINATE:
state = state.terminate(this);
break;
default:
handleUnknownControlMessage(controlMessage);
}
} catch (Exception e) {
this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
throw e;
}
}
......@@ -462,19 +467,19 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
interface State {
default State start(AkkaRpcActor<?> akkaRpcActor) {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE));
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
}
default State stop() {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.INSTANCE));
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
}
default State terminate(AkkaRpcActor<?> akkaRpcActor) {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.INSTANCE));
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
}
default State finishTermination() {
return TerminatedState.INSTANCE;
return TerminatedState.TERMINATED;
}
default boolean isRunning() {
......@@ -488,16 +493,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
@SuppressWarnings("Singleton")
enum StartedState implements State {
INSTANCE;
STARTED;
@Override
public State start(AkkaRpcActor<?> akkaRpcActor) {
return INSTANCE;
return STARTED;
}
@Override
public State stop() {
return StoppedState.INSTANCE;
return StoppedState.STOPPED;
}
@Override
......@@ -523,7 +528,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
terminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable)));
return TerminatingState.INSTANCE;
return TerminatingState.TERMINATING;
}
@Override
......@@ -534,7 +539,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
@SuppressWarnings("Singleton")
enum StoppedState implements State {
INSTANCE;
STOPPED;
@Override
public State start(AkkaRpcActor<?> akkaRpcActor) {
......@@ -552,25 +557,25 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
akkaRpcActor.mainThreadValidator.exitMainThread();
}
return StartedState.INSTANCE;
return StartedState.STARTED;
}
@Override
public State stop() {
return INSTANCE;
return STOPPED;
}
@Override
public State terminate(AkkaRpcActor<?> akkaRpcActor) {
akkaRpcActor.stop(RpcEndpointTerminationResult.success());
return TerminatingState.INSTANCE;
return TerminatingState.TERMINATING;
}
}
@SuppressWarnings("Singleton")
enum TerminatingState implements State {
INSTANCE;
TERMINATING;
@Override
public boolean isRunning() {
......@@ -579,7 +584,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
}
enum TerminatedState implements State {
INSTANCE
TERMINATED
}
private static final class RpcEndpointTerminationResult {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册