提交 a0838de7 编写于 作者: K kkloudas

[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.

上级 81dc260d
......@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
try {
stats.reportFailedRequest();
final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t);
final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
ctx.writeAndFlush(err);
} catch (IOException io) {
......
......@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
......@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
if (throwable instanceof CancellationException) {
result.completeExceptionally(throwable);
} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
if (
throwable.getCause() instanceof UnknownKvStateIdException ||
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
throwable.getCause() instanceof UnknownKvStateLocation ||
throwable.getCause() instanceof ConnectException) {
throwable.getCause() instanceof ConnectException
) {
// These failures are likely to be caused by out-of-sync
// KvStateLocation. Therefore we retry this query and
......
......@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
......@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Supplier;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
......@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Base class for queryable state integration tests with a configurable state backend.
*/
public abstract class AbstractQueryableStateTestBase extends TestLogger {
private static final int NO_OF_RETRIES = 100;
private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
......@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
allNonZero = false;
}
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
deadline,
client,
jobId,
queryName,
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*
* <b>NOTE: </b> This test is only in the non-HA variant of the tests because
* in the HA mode we use the actual JM code which does not recognize the
* {@code NotifyWhenJobStatus} message. *
* {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
......@@ -438,6 +434,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
}
}
/**
* Tests that the correct exception is thrown if the query
* contains a wrong queryable state name.
*/
@Test
public void testWrongQueryableStateName() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
JobID jobId = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(stateBackend);
env.setParallelism(maxParallelism);
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
// Value state
ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
new ValueStateDescriptor<>("any", source.getType());
source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
private static final long serialVersionUID = 7662520075515707428L;
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return value.f0;
}
}).asQueryableState("hakuna", valueState);
// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobId = jobGraph.getJobID();
CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
cluster.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
cluster.submitJobDetached(jobGraph);
// expect for the job to be running
TestingJobManagerMessages.JobStatusIs jobStatus =
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());
CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong name.
0,
VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
VoidNamespaceTypeInfo.INSTANCE,
valueState);
try {
future.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(
"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
} catch (Exception ignored) {
fail("Unexpected type of exception.");
}
} finally {
// Free cluster resources
if (jobId != null) {
CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
}
}
/**
* Similar tests as {@link #testValueState()} but before submitting the
* job, we already issue one request which fails.
......@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// Now query
int key = 0;
CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
deadline,
client,
jobId,
queryableState.getQueryableStateName(),
key,
BasicTypeInfo.INT_TYPE_INFO,
valueState,
QUERY_RETRY_DELAY,
true,
executor);
......@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState(
deadline,
client,
jobId,
"pumba",
key,
BasicTypeInfo.INT_TYPE_INFO,
foldingState,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
deadline,
client,
jobId,
"jungle",
key,
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState(
deadline,
client,
jobId,
"timon-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
mapStateDescriptor,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
final CompletableFuture<ListState<Long>> future = getKvState(
deadline,
client,
jobId,
"list-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
listStateDescriptor,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState(
deadline,
client,
jobId,
"aggr-queryable",
key,
BasicTypeInfo.INT_TYPE_INFO,
aggrStateDescriptor,
QUERY_RETRY_DELAY,
false,
executor);
......@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
///// General Utility Methods //////
private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
private static <K, S extends State, V> CompletableFuture<S> getKvState(
final Deadline deadline,
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final Time retryDelay,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) {
return retryWithDelay(
() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
NO_OF_RETRIES,
retryDelay,
executor,
failForUnknownKeyOrNamespace);
}
private static <T> CompletableFuture<T> retryWithDelay(
final Supplier<CompletableFuture<T>> operation,
final int retries,
final Time retryDelay,
final ScheduledExecutor scheduledExecutor,
final boolean failIfUnknownKeyOrNamespace) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
retryWithDelay(
resultFuture,
operation,
retries,
retryDelay,
scheduledExecutor,
failIfUnknownKeyOrNamespace);
final ScheduledExecutor executor) throws InterruptedException {
final CompletableFuture<S> resultFuture = new CompletableFuture<>();
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
return resultFuture;
}
public static <T> void retryWithDelay(
final CompletableFuture<T> resultFuture,
final Supplier<CompletableFuture<T>> operation,
final int retries,
final Time retryDelay,
final ScheduledExecutor scheduledExecutor,
final boolean failIfUnknownKeyOrNamespace) {
private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
final Deadline deadline,
final CompletableFuture<S> resultFuture,
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
if (!resultFuture.isDone()) {
final CompletableFuture<T> operationResultFuture = operation.get();
operationResultFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
if (throwable.getCause() instanceof CancellationException) {
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
} else if (throwable.getCause() instanceof AssertionError ||
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
resultFuture.completeExceptionally(throwable.getCause());
} else {
if (retries > 0) {
final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
retryDelay.toMilliseconds(),
TimeUnit.MILLISECONDS);
resultFuture.whenComplete(
(innerT, innerThrowable) -> scheduledFuture.cancel(false));
} else {
resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
"has been exhausted.", throwable));
}
}
} else {
resultFuture.complete(t);
Thread.sleep(100L);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
if (
throwable.getCause() instanceof CancellationException ||
throwable.getCause() instanceof AssertionError ||
(failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
) {
resultFuture.completeExceptionally(throwable.getCause());
} else if (deadline.hasTimeLeft()) {
try {
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
scheduledExecutor);
}
} else {
resultFuture.complete(result);
}
}, executor);
resultFuture.whenComplete(
(t, throwable) -> operationResultFuture.cancel(false));
resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
}
}
......@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
deadline,
client,
jobId,
queryableStateName,
key,
BasicTypeInfo.INT_TYPE_INFO,
stateDescriptor,
QUERY_RETRY_DELAY,
false,
executor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册