提交 e0333a41 编写于 作者: C Chesnay Schepler

[FLINK-16807][e2e] Improve reporting for instantiation errors

上级 bb195633
...@@ -30,7 +30,8 @@ public interface KafkaResourceFactory { ...@@ -30,7 +30,8 @@ public interface KafkaResourceFactory {
* Returns a {@link KafkaResource} instance. If the instance could not be instantiated (for example, because a * Returns a {@link KafkaResource} instance. If the instance could not be instantiated (for example, because a
* mandatory parameter was missing), then an empty {@link Optional} should be returned. * mandatory parameter was missing), then an empty {@link Optional} should be returned.
* *
* @return KafkaResource instance, or an empty Optional if the instance could not be instantiated * @return KafkaResource instance
* @throws Exception if the instance could not be instantiated
*/ */
Optional<KafkaResource> create(String kafkaVersion); KafkaResource create(String kafkaVersion) throws Exception;
} }
...@@ -21,8 +21,6 @@ package org.apache.flink.tests.util.kafka; ...@@ -21,8 +21,6 @@ package org.apache.flink.tests.util.kafka;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Optional;
/** /**
* A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}. * A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}.
*/ */
...@@ -30,8 +28,8 @@ public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceF ...@@ -30,8 +28,8 @@ public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceF
private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class); private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class);
@Override @Override
public Optional<KafkaResource> create(final String kafkaVersion) { public KafkaResource create(final String kafkaVersion) {
LOG.info("Created {}.", LocalStandaloneKafkaResource.class.getSimpleName()); LOG.info("Created {}.", LocalStandaloneKafkaResource.class.getSimpleName());
return Optional.of(new LocalStandaloneKafkaResource(kafkaVersion)); return new LocalStandaloneKafkaResource(kafkaVersion);
} }
} }
...@@ -81,8 +81,7 @@ public class SQLClientKafkaITCase extends TestLogger { ...@@ -81,8 +81,7 @@ public class SQLClientKafkaITCase extends TestLogger {
@Rule @Rule
public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory() public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory()
.create(FlinkResourceSetup.builder().build()) .create(FlinkResourceSetup.builder().build());
.get();
@Rule @Rule
public final KafkaResource kafka; public final KafkaResource kafka;
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.tests.util.cache; package org.apache.flink.tests.util.cache;
import java.util.Optional;
/** /**
* A factory for {@link DownloadCache} implementations. * A factory for {@link DownloadCache} implementations.
*/ */
...@@ -28,9 +26,10 @@ public interface DownloadCacheFactory { ...@@ -28,9 +26,10 @@ public interface DownloadCacheFactory {
/** /**
* Returns a {@link DownloadCache} instance. If the instance could not be instantiated (for example, because a * Returns a {@link DownloadCache} instance. If the instance could not be instantiated (for example, because a
* mandatory parameter was missing), then an empty {@link Optional} should be returned. * mandatory parameter was missing), then an exception should be thrown.
* *
* @return DownloadCache instance, or an empty Optional if the instance could not be instantiated * @return DownloadCache instance
* @throws Exception if the instance could not be instantiated
*/ */
Optional<DownloadCache> create(); DownloadCache create() throws Exception;
} }
...@@ -23,7 +23,6 @@ import org.slf4j.Logger; ...@@ -23,7 +23,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
/** /**
* A {@link DownloadCacheFactory} for the {@link LolCache}. * A {@link DownloadCacheFactory} for the {@link LolCache}.
...@@ -32,14 +31,10 @@ public final class LolCacheFactory implements DownloadCacheFactory { ...@@ -32,14 +31,10 @@ public final class LolCacheFactory implements DownloadCacheFactory {
private static final Logger LOG = LoggerFactory.getLogger(LolCacheFactory.class); private static final Logger LOG = LoggerFactory.getLogger(LolCacheFactory.class);
@Override @Override
public Optional<DownloadCache> create() { public DownloadCache create() throws IOException {
final TemporaryFolder folder = new TemporaryFolder(); final TemporaryFolder folder = new TemporaryFolder();
try { folder.create();
folder.create();
} catch (IOException e) {
throw new RuntimeException("Could not initialize temporary directory.", e);
}
LOG.info("Created {}.", LolCache.class.getSimpleName()); LOG.info("Created {}.", LolCache.class.getSimpleName());
return Optional.of(new LolCache(folder)); return new LolCache(folder);
} }
} }
...@@ -40,14 +40,14 @@ public final class PersistingDownloadCacheFactory implements DownloadCacheFactor ...@@ -40,14 +40,14 @@ public final class PersistingDownloadCacheFactory implements DownloadCacheFactor
private static final Period TIME_TO_LIVE_DEFAULT = Period.ZERO; private static final Period TIME_TO_LIVE_DEFAULT = Period.ZERO;
@Override @Override
public Optional<DownloadCache> create() { public DownloadCache create() {
final Optional<Path> tmpDir = TMP_DIR.get(); final Optional<Path> tmpDir = TMP_DIR.get();
final Period timeToLive = TIME_TO_LIVE.get(TIME_TO_LIVE_DEFAULT); final Period timeToLive = TIME_TO_LIVE.get(TIME_TO_LIVE_DEFAULT);
if (!tmpDir.isPresent()) { if (!tmpDir.isPresent()) {
LOG.debug("Not loading {} because {} was not set.", PersistingDownloadCache.class, TMP_DIR.getPropertyName()); LOG.debug("Not loading {} because {} was not set.", PersistingDownloadCache.class, TMP_DIR.getPropertyName());
return Optional.empty(); throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", PersistingDownloadCache.class, TMP_DIR.getPropertyName()));
} }
LOG.info("Created {}.", PersistingDownloadCache.class.getSimpleName()); LOG.info("Created {}.", PersistingDownloadCache.class.getSimpleName());
return Optional.of(new PersistingDownloadCache(tmpDir.get(), timeToLive)); return new PersistingDownloadCache(tmpDir.get(), timeToLive);
} }
} }
...@@ -38,23 +38,23 @@ public final class TravisDownloadCacheFactory implements DownloadCacheFactory { ...@@ -38,23 +38,23 @@ public final class TravisDownloadCacheFactory implements DownloadCacheFactory {
private static final ParameterProperty<Integer> BUILD_NUMBER = new ParameterProperty<>("TRAVIS_BUILD_NUMBER", Integer::parseInt); private static final ParameterProperty<Integer> BUILD_NUMBER = new ParameterProperty<>("TRAVIS_BUILD_NUMBER", Integer::parseInt);
@Override @Override
public Optional<DownloadCache> create() { public DownloadCache create() {
final Optional<Path> tmpDir = TMP_DIR.get(); final Optional<Path> tmpDir = TMP_DIR.get();
final Optional<Integer> timeToLive = BUILDS_TO_LIVE.get(); final Optional<Integer> timeToLive = BUILDS_TO_LIVE.get();
final Optional<Integer> buildNumber = BUILD_NUMBER.get(); final Optional<Integer> buildNumber = BUILD_NUMBER.get();
if (!tmpDir.isPresent()) { if (!tmpDir.isPresent()) {
LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, TMP_DIR.getPropertyName()); LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, TMP_DIR.getPropertyName());
return Optional.empty(); throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, TMP_DIR.getPropertyName()));
} }
if (!timeToLive.isPresent()) { if (!timeToLive.isPresent()) {
LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILDS_TO_LIVE.getPropertyName()); LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILDS_TO_LIVE.getPropertyName());
return Optional.empty(); throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, BUILDS_TO_LIVE.getPropertyName()));
} }
if (!buildNumber.isPresent()) { if (!buildNumber.isPresent()) {
LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILD_NUMBER.getPropertyName()); LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILD_NUMBER.getPropertyName());
return Optional.empty(); throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, BUILD_NUMBER.getPropertyName()));
} }
LOG.info("Created {}.", TravisDownloadCache.class.getSimpleName()); LOG.info("Created {}.", TravisDownloadCache.class.getSimpleName());
return Optional.of(new TravisDownloadCache(tmpDir.get(), timeToLive.get(), buildNumber.get())); return new TravisDownloadCache(tmpDir.get(), timeToLive.get(), buildNumber.get());
} }
} }
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.flink.tests.util.flink; package org.apache.flink.tests.util.flink;
import java.util.Optional;
/** /**
* A factory for {@link FlinkResource} implementations. * A factory for {@link FlinkResource} implementations.
*/ */
...@@ -28,10 +26,11 @@ public interface FlinkResourceFactory { ...@@ -28,10 +26,11 @@ public interface FlinkResourceFactory {
/** /**
* Returns a {@link FlinkResource} instance. If the instance could not be instantiated (for example, because a * Returns a {@link FlinkResource} instance. If the instance could not be instantiated (for example, because a
* mandatory parameter was missing), then an empty {@link Optional} should be returned. * mandatory parameter was missing), then an exception should be thrown.
* *
* @param setup setup instructions for the FlinkResource * @param setup setup instructions for the FlinkResource
* @return FlinkResource instance, or an empty Optional if the instance could not be instantiated * @return FlinkResource instance,
* @throws Exception if the instance could not be instantiated
*/ */
Optional<FlinkResource> create(FlinkResourceSetup setup); FlinkResource create(FlinkResourceSetup setup) throws Exception;
} }
...@@ -37,17 +37,17 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceF ...@@ -37,17 +37,17 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceF
private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get);
@Override @Override
public Optional<FlinkResource> create(FlinkResourceSetup setup) { public FlinkResource create(FlinkResourceSetup setup) {
Optional<Path> distributionDirectory = DISTRIBUTION_DIRECTORY.get(); Optional<Path> distributionDirectory = DISTRIBUTION_DIRECTORY.get();
if (!distributionDirectory.isPresent()) { if (!distributionDirectory.isPresent()) {
LOG.warn("The distDir property was not set. You can set it when running maven via -DdistDir=<path> ."); LOG.warn("The distDir property was not set. You can set it when running maven via -DdistDir=<path> .");
return Optional.empty(); throw new IllegalArgumentException("The distDir property was not set. You can set it when running maven via -DdistDir=<path> .");
} }
Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) { if (!logBackupDirectory.isPresent()) {
LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
} }
LOG.info("Created {}.", LocalStandaloneFlinkResource.class.getSimpleName()); LOG.info("Created {}.", LocalStandaloneFlinkResource.class.getSimpleName());
return Optional.of(new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup)); return new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup);
} }
} }
...@@ -18,13 +18,10 @@ ...@@ -18,13 +18,10 @@
package org.apache.flink.tests.util.util; package org.apache.flink.tests.util.util;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/** /**
* Utilities for factories. * Utilities for factories.
...@@ -43,24 +40,43 @@ public enum FactoryUtils { ...@@ -43,24 +40,43 @@ public enum FactoryUtils {
* @throws RuntimeException if no or multiple resources could be instantiated * @throws RuntimeException if no or multiple resources could be instantiated
* @return created instance * @return created instance
*/ */
public static <R, F> R loadAndInvokeFactory(final Class<F> factoryInterface, final Function<F, Optional<R>> factoryInvoker, final Supplier<F> defaultProvider) { public static <R, F> R loadAndInvokeFactory(final Class<F> factoryInterface, final FactoryInvoker<F, R> factoryInvoker, final Supplier<F> defaultProvider) {
final ServiceLoader<F> factories = ServiceLoader.load(factoryInterface); final ServiceLoader<F> factories = ServiceLoader.load(factoryInterface);
final List<R> resources = StreamSupport.stream(factories.spliterator(), false) final List<R> instantiatedResources = new ArrayList<>();
.map(factoryInvoker) final List<Exception> errorsDuringInitialization = new ArrayList<>();
.filter(Optional::isPresent) for (F factory : factories) {
.map(Optional::get) try {
.collect(Collectors.toList()); R resource = factoryInvoker.invoke(factory);
instantiatedResources.add(resource);
} catch (Exception e) {
errorsDuringInitialization.add(e);
}
}
if (resources.size() == 1) { if (instantiatedResources.size() == 1) {
return resources.get(0); return instantiatedResources.get(0);
} }
if (resources.isEmpty()) { if (instantiatedResources.isEmpty()) {
return factoryInvoker.apply(defaultProvider.get()) try {
.orElseThrow(() -> new RuntimeException("Could not instantiate instance using default factory.")); return factoryInvoker.invoke(defaultProvider.get());
} catch (Exception e) {
final RuntimeException exception = new RuntimeException("Could not instantiate any instance.");
final RuntimeException defaultException = new RuntimeException("Could not instantiate default instance.", e);
exception.addSuppressed(defaultException);
errorsDuringInitialization.forEach(exception::addSuppressed);
throw exception;
}
} }
throw new RuntimeException("Multiple instances were created: " + resources); throw new RuntimeException("Multiple instances were created: " + instantiatedResources);
}
/**
* Interface for invoking the factory.
*/
public interface FactoryInvoker<F, R> {
R invoke(F factory) throws Exception;
} }
} }
...@@ -76,8 +76,7 @@ public class MetricsAvailabilityITCase extends TestLogger { ...@@ -76,8 +76,7 @@ public class MetricsAvailabilityITCase extends TestLogger {
@Rule @Rule
public final FlinkResource dist = new LocalStandaloneFlinkResourceFactory() public final FlinkResource dist = new LocalStandaloneFlinkResourceFactory()
.create(FlinkResourceSetup.builder().build()) .create(FlinkResourceSetup.builder().build());
.get();
@Nullable @Nullable
private static ScheduledExecutorService scheduledExecutorService = null; private static ScheduledExecutorService scheduledExecutorService = null;
......
...@@ -152,7 +152,7 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { ...@@ -152,7 +152,7 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder(); final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder();
params.getBuilderSetup().accept(builder); params.getBuilderSetup().accept(builder);
builder.addConfiguration(getFlinkConfig(params.getInstantiationType())); builder.addConfiguration(getFlinkConfig(params.getInstantiationType()));
dist = new LocalStandaloneFlinkResourceFactory().create(builder.build()).get(); dist = new LocalStandaloneFlinkResourceFactory().create(builder.build());
} }
@Rule @Rule
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册