提交 f48f5340 编写于 作者: K kkloudas

[FLINK-7770][QS] Hide the queryable state behind a proxy.

Previously the QueryableStateClient could connect to the JM
and the TMs directly to fetch the required state. Now, there
is a proxy running on each TM and the remote client connects
to one of these proxies in order to get its state. The proxy
receives the request from the client, performs all necessary
message exchanges within the Flink cluster, receives the state
and forwards it back to the client.

This architecture allows for more security features to be
integrated in the future, as the proxy is running in the
cluster, it exposes less information about the cluster to
the outside world, and is more suitable for containerized
environments.
上级 29a6e995
......@@ -40,7 +40,7 @@ public class QueryableStateOptions {
/** Port to bind KvState server to (0 => pick random available port). */
public static final ConfigOption<Integer> SERVER_PORT =
key("query.server.port")
.defaultValue(0);
.defaultValue(9069);
/** Number of network (event loop) threads for the KvState server (0 => #slots). */
public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
......
......@@ -150,7 +150,7 @@ public class AbstractID implements Comparable<AbstractID>, java.io.Serializable
((int) this.upperPart) ^
((int) (this.upperPart >>> 32));
}
@Override
public String toString() {
if (this.toString == null) {
......@@ -163,7 +163,7 @@ public class AbstractID implements Comparable<AbstractID>, java.io.Serializable
return this.toString;
}
@Override
public int compareTo(AbstractID o) {
int diff1 = (this.upperPart < o.upperPart) ? -1 : ((this.upperPart == o.upperPart) ? 0 : 1);
......
......@@ -18,18 +18,19 @@
package org.apache.flink.queryablestate;
import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
import org.apache.flink.annotation.Internal;
/**
* Exception to fail Future with if no JobManager is currently registered at
* the {@link KvStateLocationLookupService}.
* Exception to fail Future if the Task Manager on which the
* {@link org.apache.flink.runtime.query.KvStateClientProxy}
* is running on, does not know the active Job Manager.
*/
public class UnknownJobManager extends Exception {
@Internal
public class UnknownJobManagerException extends Exception {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 9092442511708951209L;
public UnknownJobManager() {
super("Unknown JobManager. Either the JobManager has not registered yet " +
"or has lost leadership.");
public UnknownJobManagerException() {
super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
}
}
......@@ -18,14 +18,22 @@
package org.apache.flink.queryablestate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;
/**
* Thrown if the KvState does not hold any state for the given key or namespace.
*/
public class UnknownKeyOrNamespace extends IllegalStateException {
@Internal
public class UnknownKeyOrNamespaceException extends BadRequestException {
private static final long serialVersionUID = 1L;
public UnknownKeyOrNamespace() {
super("KvState does not hold any state for key/namespace.");
/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
*/
public UnknownKeyOrNamespaceException(String serverName) {
super(serverName, "No state for the specified key/namespace.");
}
}
......@@ -18,18 +18,25 @@
package org.apache.flink.queryablestate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.util.Preconditions;
/**
* Thrown if no KvState with the given ID cannot found by the server handler.
*/
public class UnknownKvStateID extends IllegalStateException {
@Internal
public class UnknownKvStateIdException extends BadRequestException {
private static final long serialVersionUID = 1L;
public UnknownKvStateID(KvStateID kvStateId) {
super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") +
" at TaskManager.");
/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
* @param kvStateId the state id for which no state was found.
*/
public UnknownKvStateIdException(String serverName, KvStateID kvStateId) {
super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.');
}
}
......@@ -18,14 +18,24 @@
package org.apache.flink.queryablestate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;
import org.apache.flink.runtime.query.KvStateLocation;
/**
* Exception thrown if there is no location information available for the given
* key group in a {@link KvStateLocation} instance.
*/
public class UnknownKvStateKeyGroupLocation extends Exception {
@Internal
public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
private static final long serialVersionUID = 1L;
/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
*/
public UnknownKvStateKeyGroupLocationException(String serverName) {
super(serverName, "Unknown key-group location.");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.client;
import org.apache.flink.api.common.JobID;
import org.apache.flink.queryablestate.UnknownJobManager;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.util.Preconditions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.Callable;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
/**
* Akka-based {@link KvStateLocationLookupService} that retrieves the current
* JobManager address and uses it for lookups.
*/
public class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
/** Future returned when no JobManager is available. */
private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
/** Leader retrieval service to retrieve the current job manager. */
private final LeaderRetrievalService leaderRetrievalService;
/** The actor system used to resolve the JobManager address. */
private final ActorSystem actorSystem;
/** Timeout for JobManager ask-requests. */
private final FiniteDuration askTimeout;
/** Retry strategy factory on future failures. */
private final LookupRetryStrategyFactory retryStrategyFactory;
/** Current job manager future. */
private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
/**
* Creates the Akka-based {@link KvStateLocationLookupService}.
*
* @param leaderRetrievalService Leader retrieval service to use.
* @param actorSystem Actor system to use.
* @param askTimeout Timeout for JobManager ask-requests.
* @param retryStrategyFactory Retry strategy if no JobManager available.
*/
public AkkaKvStateLocationLookupService(
LeaderRetrievalService leaderRetrievalService,
ActorSystem actorSystem,
FiniteDuration askTimeout,
LookupRetryStrategyFactory retryStrategyFactory) {
this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
}
public void start() {
try {
leaderRetrievalService.start(this);
} catch (Exception e) {
LOG.error("Failed to start leader retrieval service", e);
throw new RuntimeException(e);
}
}
public void shutDown() {
try {
leaderRetrievalService.stop();
} catch (Exception e) {
LOG.error("Failed to stop leader retrieval service", e);
throw new RuntimeException(e);
}
}
@Override
@SuppressWarnings("unchecked")
public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
}
/**
* Returns a future holding the {@link KvStateLocation} for the given job
* and KvState registration name.
*
* <p>If there is currently no JobManager registered with the service, the
* request is retried. The retry behaviour is specified by the
* {@link LookupRetryStrategy} of the lookup service.
*
* @param jobId JobID the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
* @return Future holding the {@link KvStateLocation}
*/
@SuppressWarnings("unchecked")
private Future<KvStateLocation> getKvStateLookupInfo(
final JobID jobId,
final String registrationName,
final LookupRetryStrategy lookupRetryStrategy) {
return jobManagerFuture
.flatMap(new Mapper<ActorGateway, Future<Object>>() {
@Override
public Future<Object> apply(ActorGateway jobManager) {
// Lookup the KvStateLocation
Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
return jobManager.ask(msg, askTimeout);
}
}, actorSystem.dispatcher())
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
.recoverWith(new Recover<Future<KvStateLocation>>() {
@Override
public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
// If the Future fails with UnknownJobManager, retry
// the request. Otherwise all Futures will be failed
// during the start up phase, when the JobManager did
// not notify this service yet or leadership is lost
// intermittently.
if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
return Patterns.after(
lookupRetryStrategy.getRetryDelay(),
actorSystem.scheduler(),
actorSystem.dispatcher(),
new Callable<Future<KvStateLocation>>() {
@Override
public Future<KvStateLocation> call() throws Exception {
return getKvStateLookupInfo(
jobId,
registrationName,
lookupRetryStrategy);
}
});
} else {
return Futures.failed(failure);
}
}
}, actorSystem.dispatcher());
}
@Override
public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
}
if (leaderAddress == null) {
jobManagerFuture = UNKNOWN_JOB_MANAGER;
} else {
jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
.map(new Mapper<ActorRef, ActorGateway>() {
@Override
public ActorGateway apply(ActorRef actorRef) {
return new AkkaActorGateway(actorRef, leaderSessionID);
}
}, actorSystem.dispatcher());
}
}
@Override
public void handleError(Exception exception) {
jobManagerFuture = Futures.failed(exception);
}
// ------------------------------------------------------------------------
/**
* Retry strategy for failed lookups.
*
* <p>Usage:
* <pre>
* LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
*
* if (retryStrategy.tryRetry()) {
* // OK to retry
* FiniteDuration retryDelay = retryStrategy.getRetryDelay();
* }
* </pre>
*/
public interface LookupRetryStrategy {
/**
* Returns the current retry.
*
* @return Current retry delay.
*/
FiniteDuration getRetryDelay();
/**
* Tries another retry and returns whether it is allowed or not.
*
* @return Whether it is allowed to do another restart or not.
*/
boolean tryRetry();
}
/**
* Factory for retry strategies.
*/
public interface LookupRetryStrategyFactory {
/**
* Creates a new retry strategy.
*
* @return The retry strategy.
*/
LookupRetryStrategy createRetryStrategy();
}
/**
* Factory for disabled retries.
*/
public static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
@Override
public LookupRetryStrategy createRetryStrategy() {
return RETRY_STRATEGY;
}
private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
@Override
public FiniteDuration getRetryDelay() {
return FiniteDuration.Zero();
}
@Override
public boolean tryRetry() {
return false;
}
}
}
/**
* Factory for fixed delay retries.
*/
public static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
private final int maxRetries;
private final FiniteDuration retryDelay;
FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}
@Override
public LookupRetryStrategy createRetryStrategy() {
return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
}
private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
private final Object retryLock = new Object();
private final int maxRetries;
private final FiniteDuration retryDelay;
private int numRetries;
public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
this.maxRetries = maxRetries;
this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
}
@Override
public FiniteDuration getRetryDelay() {
synchronized (retryLock) {
return retryDelay;
}
}
@Override
public boolean tryRetry() {
synchronized (retryLock) {
if (numRetries < maxRetries) {
numRetries++;
return true;
} else {
return false;
}
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.client.proxy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.UnknownKvStateIdException;
import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
/**
* This handler acts as an internal (to the Flink cluster) client that receives
* the requests from external clients, executes them by contacting the Job Manager (if necessary) and
* the Task Manager holding the requested state, and forwards the answer back to the client.
*/
@Internal
@ChannelHandler.Sharable
public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
/** The proxy using this handler. */
private final KvStateClientProxy proxy;
/** A cache to hold the location of different states for which we have already seen requests. */
private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache =
new ConcurrentHashMap<>();
/**
* Network client to forward queries to {@link KvStateServerImpl state server}
* instances inside the cluster.
*/
private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;
/**
* Create the handler used by the {@link KvStateClientProxyImpl}.
*
* @param proxy the {@link KvStateClientProxyImpl proxy} using the handler.
* @param queryExecutorThreads the number of threads used to process incoming requests.
* @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
* @param stats server statistics collector.
*/
public KvStateClientProxyHandler(
final KvStateClientProxyImpl proxy,
final int queryExecutorThreads,
final MessageSerializer<KvStateRequest, KvStateResponse> serializer,
final KvStateRequestStats stats) {
super(proxy, serializer, stats);
this.proxy = Preconditions.checkNotNull(proxy);
this.kvStateClient = createInternalClient(queryExecutorThreads);
}
private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) {
final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer =
new MessageSerializer<>(
new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
new KvStateResponse.KvStateResponseDeserializer());
return new Client<>(
"Queryable State Proxy Client",
threads,
messageSerializer,
new DisabledKvStateRequestStats());
}
@Override
public CompletableFuture<KvStateResponse> handleRequest(
final long requestId,
final KvStateRequest request) {
CompletableFuture<KvStateResponse> response = new CompletableFuture<>();
executeActionAsync(response, request, false);
return response;
}
private void executeActionAsync(
final CompletableFuture<KvStateResponse> result,
final KvStateRequest request,
final boolean update) {
if (!result.isDone()) {
final CompletableFuture<KvStateResponse> operationFuture = getState(request, update);
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
if (throwable instanceof CancellationException) {
result.completeExceptionally(throwable);
} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
throwable.getCause() instanceof UnknownKvStateLocation ||
throwable.getCause() instanceof ConnectException) {
// These failures are likely to be caused by out-of-sync
// KvStateLocation. Therefore we retry this query and
// force look up the location.
executeActionAsync(result, request, true);
} else {
result.completeExceptionally(throwable);
}
} else {
result.complete(t);
}
}, queryExecutor);
result.whenComplete(
(t, throwable) -> operationFuture.cancel(false));
}
}
private CompletableFuture<KvStateResponse> getState(
final KvStateRequest request,
final boolean forceUpdate) {
return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate)
.thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> {
final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
request.getKeyHashCode(), location.getNumKeyGroups());
final KvStateServerAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
if (serverAddress == null) {
return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
} else {
// Query server
final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
final KvStateInternalRequest internalRequest = new KvStateInternalRequest(
kvStateId, request.getSerializedKeyAndNamespace());
return kvStateClient.sendRequest(serverAddress, internalRequest);
}
}, queryExecutor);
}
/**
* Lookup the {@link KvStateLocation} for the given job and queryable state name.
*
* <p>The job manager will be queried for the location only if forced or no
* cached location can be found. There are no guarantees about
*
* @param jobId JobID the state instance belongs to.
* @param queryableStateName Name under which the state instance has been published.
* @param forceUpdate Flag to indicate whether to force a update via the lookup service.
* @return Future holding the KvStateLocation
*/
private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
final JobID jobId,
final String queryableStateName,
final boolean forceUpdate) {
final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) {
LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName);
return cachedFuture;
}
LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
return proxy.getJobManagerFuture().thenComposeAsync(
jobManagerGateway -> {
final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
lookupCache.put(cacheKey, locationFuture);
return locationFuture;
}, queryExecutor);
}
@Override
public void shutdown() {
kvStateClient.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.client.proxy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.UnknownJobManagerException;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.util.Preconditions;
import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
/**
* The default implementation of the {@link KvStateClientProxy}.
*/
@Internal
public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
FutureUtils.getFailedFuture(new UnknownJobManagerException());
/** Number of threads used to process incoming requests. */
private final int queryExecutorThreads;
/** Statistics collector. */
private final KvStateRequestStats stats;
private final Object leaderLock = new Object();
private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
/**
* Creates the Queryable State Client Proxy.
*
* <p>The server is instantiated using reflection by the
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)
* QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)}.
*
* <p>The server needs to be started via {@link #start()} in order to bind
* to the configured bind address.
*
* @param bindAddress the address to listen to.
* @param bindPort the port to listen to.
* @param numEventLoopThreads number of event loop threads.
* @param numQueryThreads number of query threads.
* @param stats the statistics collector.
*/
public KvStateClientProxyImpl(
final InetAddress bindAddress,
final Integer bindPort,
final Integer numEventLoopThreads,
final Integer numQueryThreads,
final KvStateRequestStats stats) {
super("Queryable State Proxy Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads);
Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
this.queryExecutorThreads = numQueryThreads;
this.stats = Preconditions.checkNotNull(stats);
}
@Override
public KvStateServerAddress getServerAddress() {
return super.getServerAddress();
}
@Override
public void start() throws InterruptedException {
super.start();
}
@Override
public void shutdown() {
super.shutdown();
}
@Override
public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
synchronized (leaderLock) {
if (leadingJobManager == null) {
jobManagerFuture = UNKNOWN_JOB_MANAGER;
} else {
jobManagerFuture = leadingJobManager;
}
}
}
@Override
public CompletableFuture<ActorGateway> getJobManagerFuture() {
synchronized (leaderLock) {
return jobManagerFuture;
}
}
@Override
public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() {
MessageSerializer<KvStateRequest, KvStateResponse> serializer =
new MessageSerializer<>(
new KvStateRequest.KvStateRequestDeserializer(),
new KvStateResponse.KvStateResponseDeserializer());
return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.messages;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
/**
* The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy
* Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server}
* of the Task Manager responsible for the requested state.
*/
@Internal
public class KvStateInternalRequest extends MessageBody {
private final KvStateID kvStateId;
private final byte[] serializedKeyAndNamespace;
public KvStateInternalRequest(
final KvStateID stateId,
final byte[] serializedKeyAndNamespace) {
this.kvStateId = Preconditions.checkNotNull(stateId);
this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
}
public KvStateID getKvStateId() {
return kvStateId;
}
public byte[] getSerializedKeyAndNamespace() {
return serializedKeyAndNamespace;
}
@Override
public byte[] serialize() {
// KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace
final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length;
return ByteBuffer.allocate(size)
.putLong(kvStateId.getLowerPart())
.putLong(kvStateId.getUpperPart())
.putInt(serializedKeyAndNamespace.length)
.put(serializedKeyAndNamespace)
.array();
}
/**
* A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}.
*/
public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> {
@Override
public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
int length = buf.readInt();
Preconditions.checkArgument(length >= 0,
"Negative length for key and namespace. " +
"This indicates a serialization error.");
byte[] serializedKeyAndNamespace = new byte[length];
if (length > 0) {
buf.readBytes(serializedKeyAndNamespace);
}
return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
}
}
}
......@@ -18,72 +18,124 @@
package org.apache.flink.queryablestate.messages;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* A {@link InternalKvState} instance request for a specific key and namespace.
* The request to be sent by the {@link org.apache.flink.queryablestate.client.QueryableStateClient
* Queryable State Client} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
* requesting a given state.
*/
public final class KvStateRequest {
@Internal
public class KvStateRequest extends MessageBody {
/** ID for this request. */
private final long requestId;
private final JobID jobId;
private final String stateName;
private final int keyHashCode;
private final byte[] serializedKeyAndNamespace;
/** ID of the requested KvState instance. */
private final KvStateID kvStateId;
public KvStateRequest(
final JobID jobId,
final String stateName,
final int keyHashCode,
final byte[] serializedKeyAndNamespace) {
/** Serialized key and namespace to request from the KvState instance. */
private final byte[] serializedKeyAndNamespace;
this.jobId = Preconditions.checkNotNull(jobId);
this.stateName = Preconditions.checkNotNull(stateName);
this.keyHashCode = keyHashCode;
this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
}
/**
* Creates a KvState instance request.
*
* @param requestId ID for this request
* @param kvStateId ID of the requested KvState instance
* @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState
* instance
*/
public KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
this.requestId = requestId;
this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
public JobID getJobId() {
return jobId;
}
/**
* Returns the request ID.
*
* @return Request ID
*/
public long getRequestId() {
return requestId;
public String getStateName() {
return stateName;
}
/**
* Returns the ID of the requested KvState instance.
*
* @return ID of the requested KvState instance
*/
public KvStateID getKvStateId() {
return kvStateId;
public int getKeyHashCode() {
return keyHashCode;
}
/**
* Returns the serialized key and namespace to request from the KvState
* instance.
*
* @return Serialized key and namespace to request from the KvState instance
*/
public byte[] getSerializedKeyAndNamespace() {
return serializedKeyAndNamespace;
}
@Override
public byte[] serialize() {
byte[] serializedStateName = stateName.getBytes();
// JobID + stateName + sizeOf(stateName) + hashCode + keyAndNamespace + sizeOf(keyAndNamespace)
final int size =
JobID.SIZE +
serializedStateName.length + Integer.BYTES +
Integer.BYTES +
serializedKeyAndNamespace.length + Integer.BYTES;
return ByteBuffer.allocate(size)
.putLong(jobId.getLowerPart())
.putLong(jobId.getUpperPart())
.putInt(serializedStateName.length)
.put(serializedStateName)
.putInt(keyHashCode)
.putInt(serializedKeyAndNamespace.length)
.put(serializedKeyAndNamespace)
.array();
}
@Override
public String toString() {
return "KvStateRequest{" +
"requestId=" + requestId +
", kvStateId=" + kvStateId +
", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length +
"jobId=" + jobId +
", stateName='" + stateName + '\'' +
", keyHashCode=" + keyHashCode +
", serializedKeyAndNamespace=" + Arrays.toString(serializedKeyAndNamespace) +
'}';
}
/**
* A {@link MessageDeserializer deserializer} for {@link KvStateRequest}.
*/
public static class KvStateRequestDeserializer implements MessageDeserializer<KvStateRequest> {
@Override
public KvStateRequest deserializeMessage(ByteBuf buf) {
JobID jobId = new JobID(buf.readLong(), buf.readLong());
int statenameLength = buf.readInt();
Preconditions.checkArgument(statenameLength >= 0,
"Negative length for state name. " +
"This indicates a serialization error.");
String stateName = "";
if (statenameLength > 0) {
byte[] name = new byte[statenameLength];
buf.readBytes(name);
stateName = new String(name);
}
int keyHashCode = buf.readInt();
int knamespaceLength = buf.readInt();
Preconditions.checkArgument(knamespaceLength >= 0,
"Negative length for key and namespace. " +
"This indicates a serialization error.");
byte[] serializedKeyAndNamespace = new byte[knamespaceLength];
if (knamespaceLength > 0) {
buf.readBytes(serializedKeyAndNamespace);
}
return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace);
}
}
}
......@@ -18,57 +18,58 @@
package org.apache.flink.queryablestate.messages;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
/**
* A successful response to a {@link KvStateRequest} containing the serialized
* result for the requested key and namespace.
* The response containing the (serialized) state sent by the {@link org.apache.flink.runtime.query.KvStateServer
* State Server} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then forwarded
* by the proxy to the original {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State
* Client}.
*/
public final class KvStateRequestResult {
@Internal
public class KvStateResponse extends MessageBody {
/** ID of the request responding to. */
private final long requestId;
private final byte[] content;
/**
* Serialized result for the requested key and namespace. If no result was
* available for the specified key and namespace, this is <code>null</code>.
*/
private final byte[] serializedResult;
public KvStateResponse(final byte[] content) {
this.content = Preconditions.checkNotNull(content);
}
/**
* Creates a successful {@link KvStateRequestResult} response.
*
* @param requestId ID of the request responding to
* @param serializedResult Serialized result or <code>null</code> if none
*/
public KvStateRequestResult(long requestId, byte[] serializedResult) {
this.requestId = requestId;
this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result");
public byte[] getContent() {
return content;
}
/**
* Returns the request ID responding to.
*
* @return Request ID responding to
*/
public long getRequestId() {
return requestId;
@Override
public byte[] serialize() {
final int size = Integer.BYTES + content.length;
return ByteBuffer.allocate(size)
.putInt(content.length)
.put(content)
.array();
}
/**
* Returns the serialized result or <code>null</code> if none available.
*
* @return Serialized result or <code>null</code> if none available.
* A {@link MessageDeserializer deserializer} for {@link KvStateResponseDeserializer}.
*/
public byte[] getSerializedResult() {
return serializedResult;
}
public static class KvStateResponseDeserializer implements MessageDeserializer<KvStateResponse> {
@Override
public String toString() {
return "KvStateRequestResult{" +
"requestId=" + requestId +
", serializedResult.length=" + serializedResult.length +
'}';
@Override
public KvStateResponse deserializeMessage(ByteBuf buf) {
int length = buf.readInt();
Preconditions.checkArgument(length >= 0,
"Negative length for state content. " +
"This indicates a serialization error.");
byte[] content = new byte[length];
buf.readBytes(content);
return new KvStateResponse(content);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* The base class for every server in the queryable state module.
* It is using pure netty to send and receive messages of type {@link MessageBody}.
*
* @param <REQ> the type of request the server expects to receive.
* @param <RESP> the type of response the server will send.
*/
@Internal
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
/** AbstractServerBase config: high water mark. */
private static final int HIGH_WATER_MARK = 32 * 1024;
private final String serverName;
/** Netty's ServerBootstrap. */
private final ServerBootstrap bootstrap;
/** Query executor thread pool. */
private final ExecutorService queryExecutor;
/** Address of this server. */
private KvStateServerAddress serverAddress;
/** The handler used for the incoming messages. */
private AbstractServerHandler<REQ, RESP> handler;
/**
* Creates the {@link AbstractServerBase}.
*
* <p>The server needs to be started via {@link #start()} in order to bind
* to the configured bind address.
*
* @param serverName the name of the server
* @param bindAddress address to bind to
* @param bindPort port to bind to (random port if 0)
* @param numEventLoopThreads number of event loop threads
*/
protected AbstractServerBase(
final String serverName,
final InetAddress bindAddress,
final Integer bindPort,
final Integer numEventLoopThreads,
final Integer numQueryThreads) {
Preconditions.checkNotNull(bindAddress);
Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + " out of valid range (0-65536).");
Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
this.serverName = Preconditions.checkNotNull(serverName);
this.queryExecutor = createQueryExecutor(numQueryThreads);
final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink " + serverName + " EventLoop Thread %d")
.build();
final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
bootstrap = new ServerBootstrap()
// Bind address and port
.localAddress(bindAddress, bindPort)
// NIO server channels
.group(nioGroup)
.channel(NioServerSocketChannel.class)
// AbstractServerBase channel Options
.option(ChannelOption.ALLOCATOR, bufferPool)
// Child channel options
.childOption(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
}
/**
* Creates a thread pool for the query execution.
*
* @param numQueryThreads Number of query threads.
* @return Thread pool for query execution
*/
private ExecutorService createQueryExecutor(int numQueryThreads) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink " + getServerName() + " Thread %d")
.build();
return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
}
protected ExecutorService getQueryExecutor() {
return queryExecutor;
}
public String getServerName() {
return serverName;
}
public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
/**
* Starts the server by binding to the configured bind address (blocking).
* @throws InterruptedException If interrupted during the bind operation
*/
public void start() throws InterruptedException {
Preconditions.checkState(serverAddress == null,
"Server " + serverName + " has already been started @ " + serverAddress + '.');
this.handler = initializeHandler();
bootstrap.childHandler(new ServerChannelInitializer<>(handler));
Channel channel = bootstrap.bind().sync().channel();
InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
LOG.info("Started server {} @ {}", serverName, serverAddress);
}
/**
* Returns the address of this server.
*
* @return AbstractServerBase address
* @throws IllegalStateException If server has not been started yet
*/
public KvStateServerAddress getServerAddress() {
Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started.");
return serverAddress;
}
/**
* Shuts down the server and all related thread pools.
*/
public void shutdown() {
LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
if (handler != null) {
handler.shutdown();
}
if (queryExecutor != null) {
queryExecutor.shutdown();
}
if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null) {
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
}
}
serverAddress = null;
}
/**
* Channel pipeline initializer.
*
* <p>The request handler is shared, whereas the other handlers are created
* per channel.
*/
private static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInitializer<SocketChannel> {
/** The shared request handler. */
private final AbstractServerHandler<REQ, RESP> sharedRequestHandler;
/**
* Creates the channel pipeline initializer with the shared request handler.
*
* @param sharedRequestHandler Shared request handler.
*/
ServerChannelInitializer(AbstractServerHandler<REQ, RESP> sharedRequestHandler) {
this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler");
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast(sharedRequestHandler);
}
}
@VisibleForTesting
public boolean isExecutorShutdown() {
return queryExecutor.isShutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The base class of every handler used by an {@link AbstractServerBase}.
*
* @param <REQ> the type of request the server expects to receive.
* @param <RESP> the type of response the server will send.
*/
@Internal
@ChannelHandler.Sharable
public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
/** The owning server of this handler. */
private final AbstractServerBase<REQ, RESP> server;
/** The serializer used to (de-)serialize messages. */
private final MessageSerializer<REQ, RESP> serializer;
/** Thread pool for query execution. */
protected final ExecutorService queryExecutor;
/** Exposed server statistics. */
private final KvStateRequestStats stats;
/**
* Create the handler.
*
* @param serializer the serializer used to (de-)serialize messages
* @param stats statistics collector
*/
public AbstractServerHandler(
final AbstractServerBase<REQ, RESP> server,
final MessageSerializer<REQ, RESP> serializer,
final KvStateRequestStats stats) {
this.server = Preconditions.checkNotNull(server);
this.serializer = Preconditions.checkNotNull(serializer);
this.queryExecutor = server.getQueryExecutor();
this.stats = Preconditions.checkNotNull(stats);
}
protected String getServerName() {
return server.getServerName();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
stats.reportActiveConnection();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
stats.reportInactiveConnection();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
REQ request = null;
long requestId = -1L;
try {
final ByteBuf buf = (ByteBuf) msg;
final MessageType msgType = MessageSerializer.deserializeHeader(buf);
requestId = MessageSerializer.getRequestId(buf);
if (msgType == MessageType.REQUEST) {
// ------------------------------------------------------------
// MessageBody
// ------------------------------------------------------------
request = serializer.deserializeRequest(buf);
stats.reportRequest();
// Execute actual query async, because it is possibly
// blocking (e.g. file I/O).
//
// A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that.
queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats));
} else {
// ------------------------------------------------------------
// Unexpected
// ------------------------------------------------------------
final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + ".";
final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg));
LOG.debug(errMsg);
ctx.writeAndFlush(failure);
}
} catch (Throwable t) {
final String stringifiedCause = ExceptionUtils.stringifyException(t);
String errMsg;
ByteBuf err;
if (request != null) {
errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause;
err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
stats.reportFailedRequest();
} else {
errMsg = "Failed incoming message. Caused by: " + stringifiedCause;
err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
}
LOG.debug(errMsg);
ctx.writeAndFlush(err);
} finally {
// IMPORTANT: We have to always recycle the incoming buffer.
// Otherwise we will leak memory out of Netty's buffer pool.
//
// If any operation ever holds on to the buffer, it is the
// responsibility of that operation to retain the buffer and
// release it later.
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause);
final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
LOG.debug(msg);
ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
}
/**
* Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response.
*
* <p><b>NOTE:</b> This method is called by multiple threads.
*
* @param requestId the id of the received request to be handled.
* @param request the request to be handled.
* @return A future with the response to be forwarded to the client.
*/
public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
/**
* Shuts down any handler specific resources, e.g. thread pools etc.
*/
public abstract void shutdown();
/**
* Task to execute the actual query against the {@link InternalKvState} instance.
*/
private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable {
private final AbstractServerHandler<REQ, RESP> handler;
private final ChannelHandlerContext ctx;
private final long requestId;
private final REQ request;
private final KvStateRequestStats stats;
private final long creationNanos;
AsyncRequestTask(
final AbstractServerHandler<REQ, RESP> handler,
final ChannelHandlerContext ctx,
final long requestId,
final REQ request,
final KvStateRequestStats stats) {
this.handler = Preconditions.checkNotNull(handler);
this.ctx = Preconditions.checkNotNull(ctx);
this.requestId = requestId;
this.request = Preconditions.checkNotNull(request);
this.stats = Preconditions.checkNotNull(stats);
this.creationNanos = System.nanoTime();
}
@Override
public void run() {
if (!ctx.channel().isActive()) {
return;
}
handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> {
try {
if (throwable != null) {
throw throwable instanceof CompletionException
? throwable.getCause()
: throwable;
}
if (resp == null) {
throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + ".");
}
final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp);
int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
ChannelFuture write;
if (serialResp.readableBytes() <= highWatermark) {
write = ctx.writeAndFlush(serialResp);
} else {
write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark));
}
write.addListener(new RequestWriteListener());
} catch (BadRequestException e) {
try {
stats.reportFailedRequest();
final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e);
ctx.writeAndFlush(err);
} catch (IOException io) {
LOG.error("Failed to respond with the error after failed request", io);
}
} catch (Throwable t) {
try {
stats.reportFailedRequest();
final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
ctx.writeAndFlush(err);
} catch (IOException io) {
LOG.error("Failed to respond with the error after failed request", io);
}
}
});
}
@Override
public String toString() {
return "AsyncRequestTask{" +
"requestId=" + requestId +
", request=" + request +
'}';
}
/**
* Callback after query result has been written.
*
* <p>Gathers stats and logs errors.
*/
private class RequestWriteListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
long durationNanos = System.nanoTime() - creationNanos;
long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
if (future.isSuccess()) {
LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis);
stats.reportSuccessfulRequest(durationMillis);
} else {
LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause());
stats.reportFailedRequest();
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
/**
* Base class for exceptions thrown during querying Flink's managed state.
*/
@Internal
public class BadRequestException extends Exception {
private static final long serialVersionUID = 3458743952407632903L;
public BadRequestException(String serverName, String message) {
super(Preconditions.checkNotNull(serverName) + " : " + message);
}
}
......@@ -16,8 +16,9 @@
* limitations under the License.
*/
package org.apache.flink.queryablestate.server;
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
......@@ -31,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
*
* @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
*/
@Internal
public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
/** The buffer to chunk. */
......
......@@ -16,13 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.queryablestate.client;
package org.apache.flink.queryablestate.network;
import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
import org.apache.flink.queryablestate.messages.KvStateRequestResult;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
import org.apache.flink.queryablestate.network.messages.RequestFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
......@@ -35,24 +36,37 @@ import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
/**
* This handler expects responses from {@link KvStateServerHandler}.
* The handler used by a {@link Client} to handling incoming messages.
*
* <p>It deserializes the response and calls the registered callback, which is
* responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}).
* @param <REQ> the type of request the client will send.
* @param <RESP> the type of response the client expects to receive.
*/
public class KvStateClientHandler extends ChannelInboundHandlerAdapter {
@Internal
public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
private final KvStateClientHandlerCallback callback;
private final String clientName;
private final MessageSerializer<REQ, RESP> serializer;
private final ClientHandlerCallback<RESP> callback;
/**
* Creates a {@link KvStateClientHandler} with the callback.
* Creates a handler with the callback.
*
* @param clientName the name of the client.
* @param serializer the serializer used to (de-)serialize messages.
* @param callback Callback for responses.
*/
public KvStateClientHandler(KvStateClientHandlerCallback callback) {
this.callback = callback;
public ClientHandler(
final String clientName,
final MessageSerializer<REQ, RESP> serializer,
final ClientHandlerCallback<RESP> callback) {
this.clientName = Preconditions.checkNotNull(clientName);
this.serializer = Preconditions.checkNotNull(serializer);
this.callback = Preconditions.checkNotNull(callback);
}
@Override
......@@ -62,10 +76,11 @@ public class KvStateClientHandler extends ChannelInboundHandlerAdapter {
MessageType msgType = MessageSerializer.deserializeHeader(buf);
if (msgType == MessageType.REQUEST_RESULT) {
KvStateRequestResult result = MessageSerializer.deserializeKvStateRequestResult(buf);
callback.onRequestResult(result.getRequestId(), result.getSerializedResult());
long requestId = MessageSerializer.getRequestId(buf);
RESP result = serializer.deserializeResponse(buf);
callback.onRequestResult(requestId, result);
} else if (msgType == MessageType.REQUEST_FAILURE) {
KvStateRequestFailure failure = MessageSerializer.deserializeKvStateRequestFailure(buf);
RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
} else if (msgType == MessageType.SERVER_FAILURE) {
throw MessageSerializer.deserializeServerFailure(buf);
......
......@@ -16,25 +16,27 @@
* limitations under the License.
*/
package org.apache.flink.queryablestate.client;
package org.apache.flink.queryablestate.network;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.messages.MessageBody;
/**
* Callback for {@link KvStateClientHandler}.
* Callback for {@link ClientHandler}.
*/
public interface KvStateClientHandlerCallback {
@Internal
public interface ClientHandlerCallback<RESP extends MessageBody> {
/**
* Called on a successful {@link KvStateRequest}.
* Called on a successful request.
*
* @param requestId ID of the request
* @param serializedValue Serialized value for the request
* @param requestId ID of the request
* @param response The received response
*/
void onRequestResult(long requestId, byte[] serializedValue);
void onRequestResult(long requestId, RESP response);
/**
* Called on a failed {@link KvStateRequest}.
* Called on a failed request.
*
* @param requestId ID of the request
* @param cause Cause of the request failure
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.network.messages;
import org.apache.flink.annotation.Internal;
/**
* The base class for every message exchanged during the communication between
* {@link org.apache.flink.queryablestate.network.Client client} and
* {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
*
* <p>Every such message should also have a {@link MessageDeserializer}.
*/
@Internal
public abstract class MessageBody {
/**
* Serializes the message into a byte array.
* @return A byte array with the serialized content of the message.
*/
public abstract byte[] serialize();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.network.messages;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
/**
* A utility used to deserialize a {@link MessageBody message}.
* @param <M> The type of the message to be deserialized.
* It has to extend {@link MessageBody}
*/
@Internal
public interface MessageDeserializer<M extends MessageBody> {
/**
* Deserializes a message contained in a byte buffer.
* @param buf the buffer containing the message.
* @return The deserialized message.
*/
M deserializeMessage(ByteBuf buf);
}
......@@ -18,11 +18,7 @@
package org.apache.flink.queryablestate.network.messages;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
import org.apache.flink.queryablestate.messages.KvStateRequestResult;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
......@@ -37,8 +33,8 @@ import java.io.ObjectOutputStream;
/**
* Serialization and deserialization of messages exchanged between
* {@link org.apache.flink.queryablestate.client.KvStateClient client} and
* {@link org.apache.flink.queryablestate.server.KvStateServerImpl server}.
* {@link org.apache.flink.queryablestate.network.Client client} and
* {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
*
* <p>The binary messages have the following format:
*
......@@ -52,8 +48,12 @@ import java.io.ObjectOutputStream;
* </pre>
*
* <p>The concrete content of a message depends on the {@link MessageType}.
*
* @param <REQ> Type of the requests of the protocol.
* @param <RESP> Type of the responses of the protocol.
*/
public final class MessageSerializer {
@Internal
public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
/** The serialization version ID. */
private static final int VERSION = 0x79a1b710;
......@@ -64,78 +64,58 @@ public final class MessageSerializer {
/** Byte length of the request id. */
private static final int REQUEST_ID_SIZE = Long.BYTES;
/** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
private final MessageDeserializer<REQ> requestDeserializer;
/** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
private final MessageDeserializer<RESP> responseDeserializer;
public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
requestDeserializer = Preconditions.checkNotNull(requestDeser);
responseDeserializer = Preconditions.checkNotNull(responseDeser);
}
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
/**
* Allocates a buffer and serializes the KvState request into it.
* Serializes the request sent to the
* {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
*
* @param alloc ByteBuf allocator for the buffer to
* serialize message into
* @param requestId ID for this request
* @param kvStateId ID of the requested KvState instance
* @param serializedKeyAndNamespace Serialized key and namespace to request
* from the KvState instance.
* @return Serialized KvState request message
* @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
* @param requestId The id of the request to which the message refers to.
* @param request The request to be serialized.
* @return A {@link ByteBuf} containing the serialized message.
*/
public static ByteBuf serializeKvStateRequest(
ByteBufAllocator alloc,
long requestId,
KvStateID kvStateId,
byte[] serializedKeyAndNamespace) {
// Header + request ID + KvState ID + Serialized namespace
int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + AbstractID.SIZE + (Integer.BYTES + serializedKeyAndNamespace.length);
ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame length
buf.writeInt(frameLength);
writeHeader(buf, MessageType.REQUEST);
buf.writeLong(requestId);
buf.writeLong(kvStateId.getLowerPart());
buf.writeLong(kvStateId.getUpperPart());
buf.writeInt(serializedKeyAndNamespace.length);
buf.writeBytes(serializedKeyAndNamespace);
return buf;
public static <REQ extends MessageBody> ByteBuf serializeRequest(
final ByteBufAllocator alloc,
final long requestId,
final REQ request) {
Preconditions.checkNotNull(request);
return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
}
/**
* Allocates a buffer and serializes the KvState request result into it.
* Serializes the response sent to the
* {@link org.apache.flink.queryablestate.network.Client}.
*
* @param alloc ByteBuf allocator for the buffer to serialize message into
* @param requestId ID for this request
* @param serializedResult Serialized Result
* @return Serialized KvState request result message
* @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
* @param requestId The id of the request to which the message refers to.
* @param response The response to be serialized.
* @return A {@link ByteBuf} containing the serialized message.
*/
public static ByteBuf serializeKvStateRequestResult(
ByteBufAllocator alloc,
long requestId,
byte[] serializedResult) {
Preconditions.checkNotNull(serializedResult, "Serialized result");
// Header + request ID + serialized result
int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 4 + serializedResult.length;
// TODO: 10/5/17 there was a bug all this time?
ByteBuf buf = alloc.ioBuffer(frameLength + 4);
buf.writeInt(frameLength);
writeHeader(buf, MessageType.REQUEST_RESULT);
buf.writeLong(requestId);
buf.writeInt(serializedResult.length);
buf.writeBytes(serializedResult);
return buf;
public static <RESP extends MessageBody> ByteBuf serializeResponse(
final ByteBufAllocator alloc,
final long requestId,
final RESP response) {
Preconditions.checkNotNull(response);
return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
}
/**
* Serializes the exception containing the failure message sent to the
* {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
* {@link org.apache.flink.queryablestate.network.Client} in case of
* protocol related errors.
*
* @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
......@@ -143,7 +123,7 @@ public final class MessageSerializer {
* @param cause The exception thrown at the server.
* @return A {@link ByteBuf} containing the serialized message.
*/
public static ByteBuf serializeKvStateRequestFailure(
public static ByteBuf serializeRequestFailure(
final ByteBufAllocator alloc,
final long requestId,
final Throwable cause) throws IOException {
......@@ -168,7 +148,7 @@ public final class MessageSerializer {
/**
* Serializes the failure message sent to the
* {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
* {@link org.apache.flink.queryablestate.network.Client} in case of
* server related errors.
*
* @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
......@@ -207,6 +187,31 @@ public final class MessageSerializer {
buf.writeInt(messageType.ordinal());
}
/**
* Helper for serializing the messages.
*
* @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
* @param requestId The id of the request to which the message refers to.
* @param messageType The {@link MessageType type of the message}.
* @param payload The serialized version of the message.
* @return A {@link ByteBuf} containing the serialized message.
*/
private static ByteBuf writePayload(
final ByteBufAllocator alloc,
final long requestId,
final MessageType messageType,
final byte[] payload) {
final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
buf.writeInt(frameLength);
writeHeader(buf, messageType);
buf.writeLong(requestId);
buf.writeBytes(payload);
return buf;
}
// ------------------------------------------------------------------------
// Deserialization
// ------------------------------------------------------------------------
......@@ -230,71 +235,54 @@ public final class MessageSerializer {
// fetching the message type
int msgType = buf.readInt();
MessageType[] values = MessageType.values();
Preconditions.checkState(msgType >= 0 && msgType <= values.length,
Preconditions.checkState(msgType >= 0 && msgType < values.length,
"Illegal message type with index " + msgType + '.');
return values[msgType];
}
/**
* Deserializes the KvState request message.
*
* <p><strong>Important</strong>: the returned buffer is sliced from the
* incoming ByteBuf stream and retained. Therefore, it needs to be recycled
* by the consumer.
*
* @param buf Buffer to deserialize (expected to be positioned after header)
* @return Deserialized KvStateRequest
* De-serializes the header and returns the {@link MessageType}.
* <pre>
* <b>The buffer is expected to be at the request id position.</b>
* </pre>
* @param buf The {@link ByteBuf} containing the serialized request id.
* @return The request id.
*/
public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) {
long requestId = buf.readLong();
KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
// Serialized key and namespace
int length = buf.readInt();
if (length < 0) {
throw new IllegalArgumentException("Negative length for serialized key and namespace. " +
"This indicates a serialization error.");
}
// Copy the buffer in order to be able to safely recycle the ByteBuf
byte[] serializedKeyAndNamespace = new byte[length];
if (length > 0) {
buf.readBytes(serializedKeyAndNamespace);
}
return new KvStateRequest(requestId, kvStateId, serializedKeyAndNamespace);
public static long getRequestId(final ByteBuf buf) {
return buf.readLong();
}
/**
* Deserializes the KvState request result.
*
* @param buf Buffer to deserialize (expected to be positioned after header)
* @return Deserialized KvStateRequestResult
* De-serializes the request sent to the
* {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
* <pre>
* <b>The buffer is expected to be at the request position.</b>
* </pre>
* @param buf The {@link ByteBuf} containing the serialized request.
* @return The request.
*/
public static KvStateRequestResult deserializeKvStateRequestResult(ByteBuf buf) {
long requestId = buf.readLong();
// Serialized KvState
int length = buf.readInt();
if (length < 0) {
throw new IllegalArgumentException("Negative length for serialized result. " +
"This indicates a serialization error.");
}
byte[] serializedValue = new byte[length];
if (length > 0) {
buf.readBytes(serializedValue);
}
public REQ deserializeRequest(final ByteBuf buf) {
Preconditions.checkNotNull(buf);
return requestDeserializer.deserializeMessage(buf);
}
return new KvStateRequestResult(requestId, serializedValue);
/**
* De-serializes the response sent to the
* {@link org.apache.flink.queryablestate.network.Client}.
* <pre>
* <b>The buffer is expected to be at the response position.</b>
* </pre>
* @param buf The {@link ByteBuf} containing the serialized response.
* @return The response.
*/
public RESP deserializeResponse(final ByteBuf buf) {
Preconditions.checkNotNull(buf);
return responseDeserializer.deserializeMessage(buf);
}
/**
* De-serializes the {@link KvStateRequestFailure} sent to the
* {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
* De-serializes the {@link RequestFailure} sent to the
* {@link org.apache.flink.queryablestate.network.Client} in case of
* protocol related errors.
* <pre>
* <b>The buffer is expected to be at the correct position.</b>
......@@ -302,7 +290,7 @@ public final class MessageSerializer {
* @param buf The {@link ByteBuf} containing the serialized failure message.
* @return The failure message.
*/
public static KvStateRequestFailure deserializeKvStateRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
long requestId = buf.readLong();
Throwable cause;
......@@ -310,12 +298,12 @@ public final class MessageSerializer {
ObjectInputStream in = new ObjectInputStream(bis)) {
cause = (Throwable) in.readObject();
}
return new KvStateRequestFailure(requestId, cause);
return new RequestFailure(requestId, cause);
}
/**
* De-serializes the failure message sent to the
* {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
* {@link org.apache.flink.queryablestate.network.Client} in case of
* server related errors.
* <pre>
* <b>The buffer is expected to be at the correct position.</b>
......
......@@ -18,11 +18,14 @@
package org.apache.flink.queryablestate.network.messages;
import org.apache.flink.annotation.Internal;
/**
* Expected message types during the communication between
* {@link org.apache.flink.queryablestate.client.KvStateClient state client} and
* {@link org.apache.flink.queryablestate.server.KvStateServerImpl state server}.
* {@link org.apache.flink.queryablestate.network.Client client} and
* {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
*/
@Internal
public enum MessageType {
/** The message is a request. */
......
......@@ -16,12 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.queryablestate.messages;
package org.apache.flink.queryablestate.network.messages;
import org.apache.flink.annotation.Internal;
/**
* A failure response to a {@link KvStateRequest}.
* A message indicating a protocol-related error.
*/
public final class KvStateRequestFailure {
@Internal
public class RequestFailure {
/** ID of the request responding to. */
private final long requestId;
......@@ -30,12 +33,12 @@ public final class KvStateRequestFailure {
private final Throwable cause;
/**
* Creates a failure response to a {@link KvStateRequest}.
* Creates a failure response to a {@link MessageBody}.
*
* @param requestId ID for the request responding to
* @param cause Failure cause (not allowed to be a user type)
*/
public KvStateRequestFailure(long requestId, Throwable cause) {
public RequestFailure(long requestId, Throwable cause) {
this.requestId = requestId;
this.cause = cause;
}
......@@ -60,7 +63,7 @@ public final class KvStateRequestFailure {
@Override
public String toString() {
return "KvStateRequestFailure{" +
return "RequestFailure{" +
"requestId=" + requestId +
", cause=" + cause +
'}';
......
......@@ -18,31 +18,25 @@
package org.apache.flink.queryablestate.server;
import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
import org.apache.flink.queryablestate.UnknownKvStateID;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.UnknownKvStateIdException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
/**
* This handler dispatches asynchronous tasks, which query {@link InternalKvState}
......@@ -52,257 +46,62 @@ import java.util.concurrent.TimeUnit;
* query task. The actual query is handled in a separate thread as it might
* otherwise block the network threads (file I/O etc.).
*/
@Internal
@ChannelHandler.Sharable
public class KvStateServerHandler extends ChannelInboundHandlerAdapter {
public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
/** KvState registry holding references to the KvState instances. */
private final KvStateRegistry registry;
/** Thread pool for query execution. */
private final ExecutorService queryExecutor;
/** Exposed server statistics. */
private final KvStateRequestStats stats;
/**
* Create the handler.
* Create the handler used by the {@link KvStateServerImpl}.
*
* @param kvStateRegistry Registry to query.
* @param queryExecutor Thread pool for query execution.
* @param stats Exposed server statistics.
* @param server the {@link KvStateServerImpl} using the handler.
* @param kvStateRegistry registry to query.
* @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
* @param stats server statistics collector.
*/
public KvStateServerHandler(
KvStateRegistry kvStateRegistry,
ExecutorService queryExecutor,
KvStateRequestStats stats) {
final KvStateServerImpl server,
final KvStateRegistry kvStateRegistry,
final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
final KvStateRequestStats stats) {
this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool");
this.stats = Objects.requireNonNull(stats, "KvStateRequestStats");
super(server, serializer, stats);
this.registry = Preconditions.checkNotNull(kvStateRegistry);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
stats.reportActiveConnection();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
stats.reportInactiveConnection();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
KvStateRequest request = null;
public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
try {
ByteBuf buf = (ByteBuf) msg;
MessageType msgType = MessageSerializer.deserializeHeader(buf);
if (msgType == MessageType.REQUEST) {
// ------------------------------------------------------------
// Request
// ------------------------------------------------------------
request = MessageSerializer.deserializeKvStateRequest(buf);
stats.reportRequest();
InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
if (kvState != null) {
// Execute actual query async, because it is possibly
// blocking (e.g. file I/O).
//
// A submission failure is not treated as fatal.
queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats));
} else {
ByteBuf unknown = MessageSerializer.serializeKvStateRequestFailure(
ctx.alloc(),
request.getRequestId(),
new UnknownKvStateID(request.getKvStateId()));
ctx.writeAndFlush(unknown);
stats.reportFailedRequest();
}
final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
// ------------------------------------------------------------
// Unexpected
// ------------------------------------------------------------
ByteBuf failure = MessageSerializer.serializeServerFailure(
ctx.alloc(),
new IllegalArgumentException("Unexpected message type " + msgType
+ ". KvStateServerHandler expects "
+ MessageType.REQUEST + " messages."));
ctx.writeAndFlush(failure);
}
} catch (Throwable t) {
String stringifiedCause = ExceptionUtils.stringifyException(t);
ByteBuf err;
if (request != null) {
String errMsg = "Failed to handle incoming request with ID " +
request.getRequestId() + ". Caused by: " + stringifiedCause;
err = MessageSerializer.serializeKvStateRequestFailure(
ctx.alloc(),
request.getRequestId(),
new RuntimeException(errMsg));
stats.reportFailedRequest();
} else {
String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause;
err = MessageSerializer.serializeServerFailure(
ctx.alloc(),
new RuntimeException(errMsg));
}
ctx.writeAndFlush(err);
} finally {
// IMPORTANT: We have to always recycle the incoming buffer.
// Otherwise we will leak memory out of Netty's buffer pool.
//
// If any operation ever holds on to the buffer, it is the
// responsibility of that operation to retain the buffer and
// release it later.
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String stringifiedCause = ExceptionUtils.stringifyException(cause);
String msg = "Exception in server pipeline. Caused by: " + stringifiedCause;
ByteBuf err = MessageSerializer.serializeServerFailure(
ctx.alloc(),
new RuntimeException(msg));
ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
}
/**
* Task to execute the actual query against the {@link InternalKvState} instance.
*/
private static class AsyncKvStateQueryTask implements Runnable {
private final ChannelHandlerContext ctx;
private final KvStateRequest request;
private final InternalKvState<?> kvState;
private final KvStateRequestStats stats;
private final long creationNanos;
public AsyncKvStateQueryTask(
ChannelHandlerContext ctx,
KvStateRequest request,
InternalKvState<?> kvState,
KvStateRequestStats stats) {
this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
this.request = Objects.requireNonNull(request, "State query");
this.kvState = Objects.requireNonNull(kvState, "KvState");
this.stats = Objects.requireNonNull(stats, "State query stats");
this.creationNanos = System.nanoTime();
}
@Override
public void run() {
boolean success = false;
try {
if (!ctx.channel().isActive()) {
return;
}
// Query the KvState instance
byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
if (serializedResult != null) {
// We found some data, success!
ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
ctx.alloc(),
request.getRequestId(),
serializedResult);
int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
ChannelFuture write;
if (buf.readableBytes() <= highWatermark) {
write = ctx.writeAndFlush(buf);
} else {
write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark));
}
write.addListener(new QueryResultWriteListener());
success = true;
responseFuture.complete(new KvStateResponse(serializedResult));
} else {
// No data for the key/namespace. This is considered to be
// a failure.
ByteBuf unknownKey = MessageSerializer.serializeKvStateRequestFailure(
ctx.alloc(),
request.getRequestId(),
new UnknownKeyOrNamespace());
ctx.writeAndFlush(unknownKey);
}
} catch (Throwable t) {
try {
String stringifiedCause = ExceptionUtils.stringifyException(t);
String errMsg = "Failed to query state backend for query " +
request.getRequestId() + ". Caused by: " + stringifiedCause;
ByteBuf err = MessageSerializer.serializeKvStateRequestFailure(
ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg));
ctx.writeAndFlush(err);
} catch (IOException e) {
LOG.error("Failed to respond with the error after failed to query state backend", e);
}
} finally {
if (!success) {
stats.reportFailedRequest();
responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
}
}
return responseFuture;
} catch (Throwable t) {
String errMsg = "Error while processing request with ID " + requestId +
". Caused by: " + ExceptionUtils.stringifyException(t);
responseFuture.completeExceptionally(new RuntimeException(errMsg));
return responseFuture;
}
}
@Override
public String toString() {
return "AsyncKvStateQueryTask{" +
", request=" + request +
", creationNanos=" + creationNanos +
'}';
}
/**
* Callback after query result has been written.
*
* <p>Gathers stats and logs errors.
*/
private class QueryResultWriteListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
long durationNanos = System.nanoTime() - creationNanos;
long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
if (future.isSuccess()) {
stats.reportSuccessfulRequest(durationMillis);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause());
}
stats.reportFailedRequest();
}
}
}
@Override
public void shutdown() {
// do nothing
}
}
......@@ -18,213 +18,93 @@
package org.apache.flink.queryablestate.server;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Netty-based server answering {@link KvStateRequest} messages.
*
* <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask})
* that are executed by a separate query Thread pool. This pool is shared among
* all TCP connections.
*
* <p>The incoming pipeline looks as follows:
* <pre>
* Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
* </pre>
*
* <p>Received binary messages are expected to contain a frame length field. Netty's
* {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
* giving it to our {@link KvStateServerHandler}.
*
* <p>Connections are established and closed by the client. The server only
* closes the connection on a fatal failure that cannot be recovered. A
* server-side connection close is considered a failure by the client.
* The default implementation of the {@link KvStateServer}.
*/
public class KvStateServerImpl implements KvStateServer {
private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class);
@Internal
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
/** Server config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
/** Server config: high water mark. */
private static final int HIGH_WATER_MARK = 32 * 1024;
/** The {@link KvStateRegistry} to query for state instances. */
private final KvStateRegistry kvStateRegistry;
/** Netty's ServerBootstrap. */
private final ServerBootstrap bootstrap;
private final KvStateRequestStats stats;
/** Query executor thread pool. */
private final ExecutorService queryExecutor;
/** Address of this server. */
private KvStateServerAddress serverAddress;
private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
/**
* Creates the {@link KvStateServer}.
* Creates the state server.
*
* <p>The server is instantiated using reflection by the
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)
* QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}.
*
* <p>The server needs to be started via {@link #start()} in order to bind
* to the configured bind address.
*
* @param bindAddress Address to bind to
* @param bindPort Port to bind to. Pick random port if 0.
* @param numEventLoopThreads Number of event loop threads
* @param numQueryThreads Number of query threads
* @param kvStateRegistry KvStateRegistry to query for KvState instances
* @param stats Statistics tracker
* @param bindAddress the address to listen to.
* @param bindPort the port to listen to.
* @param numEventLoopThreads number of event loop threads.
* @param numQueryThreads number of query threads.
* @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
* @param stats the statistics collector.
*/
public KvStateServerImpl(
InetAddress bindAddress,
Integer bindPort,
Integer numEventLoopThreads,
Integer numQueryThreads,
KvStateRegistry kvStateRegistry,
KvStateRequestStats stats) {
Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort +
" is out of valid port range (0-65536).");
Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
Preconditions.checkNotNull(stats, "KvStateRequestStats");
NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink KvStateServer EventLoop Thread %d")
.build();
NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
queryExecutor = createQueryExecutor(numQueryThreads);
// Shared between all channels
KvStateServerHandler serverHandler = new KvStateServerHandler(
kvStateRegistry,
queryExecutor,
stats);
bootstrap = new ServerBootstrap()
// Bind address and port
.localAddress(bindAddress, bindPort)
// NIO server channels
.group(nioGroup)
.channel(NioServerSocketChannel.class)
// Server channel Options
.option(ChannelOption.ALLOCATOR, bufferPool)
// Child channel options
.childOption(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
// See initializer for pipeline details
.childHandler(new KvStateServerChannelInitializer(serverHandler));
final InetAddress bindAddress,
final Integer bindPort,
final Integer numEventLoopThreads,
final Integer numQueryThreads,
final KvStateRegistry kvStateRegistry,
final KvStateRequestStats stats) {
super("Queryable State Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads);
this.stats = Preconditions.checkNotNull(stats);
this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
}
@Override
public void start() throws InterruptedException {
Channel channel = bootstrap.bind().sync().channel();
InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
this.serializer = new MessageSerializer<>(
new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
new KvStateResponse.KvStateResponseDeserializer());
return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
}
@Override
public KvStateServerAddress getAddress() {
if (serverAddress == null) {
throw new IllegalStateException("KvStateServer not started yet.");
}
return serverAddress;
public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
return serializer;
}
@Override
public void shutDown() {
if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null) {
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
}
}
if (queryExecutor != null) {
queryExecutor.shutdown();
}
serverAddress = null;
public void start() throws InterruptedException {
super.start();
}
/**
* Creates a thread pool for the query execution.
*
* @param numQueryThreads Number of query threads.
* @return Thread pool for query execution
*/
private static ExecutorService createQueryExecutor(int numQueryThreads) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink KvStateServer Query Thread %d")
.build();
return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
@Override
public KvStateServerAddress getServerAddress() {
return super.getServerAddress();
}
/**
* Channel pipeline initializer.
*
* <p>The request handler is shared, whereas the other handlers are created
* per channel.
*/
private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> {
/** The shared request handler. */
private final KvStateServerHandler sharedRequestHandler;
/**
* Creates the channel pipeline initializer with the shared request handler.
*
* @param sharedRequestHandler Shared request handler.
*/
public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) {
this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast(sharedRequestHandler);
}
@Override
public void shutdown() {
super.shutdown();
}
}
......@@ -445,4 +445,20 @@ public class FutureUtils {
return result;
}
// ------------------------------------------------------------------------
// Future Completed with an exception.
// ------------------------------------------------------------------------
/**
* Returns a {@link CompletableFuture} that has failed with the exception
* provided as argument.
* @param throwable the exception to fail the future with.
* @return The failed future.
*/
public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
CompletableFuture<T> failedAttempt = new CompletableFuture<>();
failedAttempt.completeExceptionally(throwable);
return failedAttempt;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册