diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java index 3d08d64bb64e3237afd1a5031e9c1a17c01fd45a..ae21a6a6c5e5202bd7cabd79baca99c0c0d95c9a 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java @@ -30,7 +30,8 @@ public interface KafkaResourceFactory { * Returns a {@link KafkaResource} instance. If the instance could not be instantiated (for example, because a * mandatory parameter was missing), then an empty {@link Optional} should be returned. * - * @return KafkaResource instance, or an empty Optional if the instance could not be instantiated + * @return KafkaResource instance + * @throws Exception if the instance could not be instantiated */ - Optional create(String kafkaVersion); + KafkaResource create(String kafkaVersion) throws Exception; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java index 0d61119155c8262514de3e326d69180f33dcafbc..2e274434f36f3c3332e6051369c318c80b7ce2e2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java @@ -21,8 +21,6 @@ package org.apache.flink.tests.util.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; - /** * A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}. */ @@ -30,8 +28,8 @@ public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceF private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class); @Override - public Optional create(final String kafkaVersion) { + public KafkaResource create(final String kafkaVersion) { LOG.info("Created {}.", LocalStandaloneKafkaResource.class.getSimpleName()); - return Optional.of(new LocalStandaloneKafkaResource(kafkaVersion)); + return new LocalStandaloneKafkaResource(kafkaVersion); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index bf5e4f336b0598f71b82389d26d2bd551503ad32..923377eb01aad69b0667200c040cfc1903453726 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -81,8 +81,7 @@ public class SQLClientKafkaITCase extends TestLogger { @Rule public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory() - .create(FlinkResourceSetup.builder().build()) - .get(); + .create(FlinkResourceSetup.builder().build()); @Rule public final KafkaResource kafka; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheFactory.java index 82ff48bdb1c97310869e50f5dc9d64d0f3a8ed5f..767e96ac8fbd1ab09180c964f3b5dc0c0d99a6c0 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheFactory.java @@ -18,8 +18,6 @@ package org.apache.flink.tests.util.cache; -import java.util.Optional; - /** * A factory for {@link DownloadCache} implementations. */ @@ -28,9 +26,10 @@ public interface DownloadCacheFactory { /** * Returns a {@link DownloadCache} instance. If the instance could not be instantiated (for example, because a - * mandatory parameter was missing), then an empty {@link Optional} should be returned. + * mandatory parameter was missing), then an exception should be thrown. * - * @return DownloadCache instance, or an empty Optional if the instance could not be instantiated + * @return DownloadCache instance + * @throws Exception if the instance could not be instantiated */ - Optional create(); + DownloadCache create() throws Exception; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/LolCacheFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/LolCacheFactory.java index 68e4973c539e0e0260ac340b702858871f17d561..b0be5ce8fdd025d36c31a33d37ced6592952ca0a 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/LolCacheFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/LolCacheFactory.java @@ -23,7 +23,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Optional; /** * A {@link DownloadCacheFactory} for the {@link LolCache}. @@ -32,14 +31,10 @@ public final class LolCacheFactory implements DownloadCacheFactory { private static final Logger LOG = LoggerFactory.getLogger(LolCacheFactory.class); @Override - public Optional create() { + public DownloadCache create() throws IOException { final TemporaryFolder folder = new TemporaryFolder(); - try { - folder.create(); - } catch (IOException e) { - throw new RuntimeException("Could not initialize temporary directory.", e); - } + folder.create(); LOG.info("Created {}.", LolCache.class.getSimpleName()); - return Optional.of(new LolCache(folder)); + return new LolCache(folder); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/PersistingDownloadCacheFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/PersistingDownloadCacheFactory.java index 27013f2fabab4ce00a8ef98a2b014fc50514204d..bbc8ab3b8b1e4ee2fb82d263537b85325fcf91b3 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/PersistingDownloadCacheFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/PersistingDownloadCacheFactory.java @@ -40,14 +40,14 @@ public final class PersistingDownloadCacheFactory implements DownloadCacheFactor private static final Period TIME_TO_LIVE_DEFAULT = Period.ZERO; @Override - public Optional create() { + public DownloadCache create() { final Optional tmpDir = TMP_DIR.get(); final Period timeToLive = TIME_TO_LIVE.get(TIME_TO_LIVE_DEFAULT); if (!tmpDir.isPresent()) { LOG.debug("Not loading {} because {} was not set.", PersistingDownloadCache.class, TMP_DIR.getPropertyName()); - return Optional.empty(); + throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", PersistingDownloadCache.class, TMP_DIR.getPropertyName())); } LOG.info("Created {}.", PersistingDownloadCache.class.getSimpleName()); - return Optional.of(new PersistingDownloadCache(tmpDir.get(), timeToLive)); + return new PersistingDownloadCache(tmpDir.get(), timeToLive); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/TravisDownloadCacheFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/TravisDownloadCacheFactory.java index fa1057d10c0101a30c279ed878eaeac97513d72c..90520c536cc3f39e192e815f1356b0e3d043ac89 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/TravisDownloadCacheFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/TravisDownloadCacheFactory.java @@ -38,23 +38,23 @@ public final class TravisDownloadCacheFactory implements DownloadCacheFactory { private static final ParameterProperty BUILD_NUMBER = new ParameterProperty<>("TRAVIS_BUILD_NUMBER", Integer::parseInt); @Override - public Optional create() { + public DownloadCache create() { final Optional tmpDir = TMP_DIR.get(); final Optional timeToLive = BUILDS_TO_LIVE.get(); final Optional buildNumber = BUILD_NUMBER.get(); if (!tmpDir.isPresent()) { LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, TMP_DIR.getPropertyName()); - return Optional.empty(); + throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, TMP_DIR.getPropertyName())); } if (!timeToLive.isPresent()) { LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILDS_TO_LIVE.getPropertyName()); - return Optional.empty(); + throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, BUILDS_TO_LIVE.getPropertyName())); } if (!buildNumber.isPresent()) { LOG.debug("Not loading {} because {} was not set.", TravisDownloadCache.class, BUILD_NUMBER.getPropertyName()); - return Optional.empty(); + throw new IllegalArgumentException(String.format("Not loading %s because %s was not set.", TravisDownloadCache.class, BUILD_NUMBER.getPropertyName())); } LOG.info("Created {}.", TravisDownloadCache.class.getSimpleName()); - return Optional.of(new TravisDownloadCache(tmpDir.get(), timeToLive.get(), buildNumber.get())); + return new TravisDownloadCache(tmpDir.get(), timeToLive.get(), buildNumber.get()); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceFactory.java index 385282be8c66125fd5d3375569862219553d09e9..20bc8dd1362ed6f91398ae8b1dfbc6c78a4ff346 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceFactory.java @@ -18,8 +18,6 @@ package org.apache.flink.tests.util.flink; -import java.util.Optional; - /** * A factory for {@link FlinkResource} implementations. */ @@ -28,10 +26,11 @@ public interface FlinkResourceFactory { /** * Returns a {@link FlinkResource} instance. If the instance could not be instantiated (for example, because a - * mandatory parameter was missing), then an empty {@link Optional} should be returned. + * mandatory parameter was missing), then an exception should be thrown. * * @param setup setup instructions for the FlinkResource - * @return FlinkResource instance, or an empty Optional if the instance could not be instantiated + * @return FlinkResource instance, + * @throws Exception if the instance could not be instantiated */ - Optional create(FlinkResourceSetup setup); + FlinkResource create(FlinkResourceSetup setup) throws Exception; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java index e4c1e5c02f398be0345d670d3c76bd51f1ad8d8b..2edf693e39c36be94e229c5a01b8c4ac1f3de72a 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java @@ -37,17 +37,17 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceF private static final ParameterProperty DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); @Override - public Optional create(FlinkResourceSetup setup) { + public FlinkResource create(FlinkResourceSetup setup) { Optional distributionDirectory = DISTRIBUTION_DIRECTORY.get(); if (!distributionDirectory.isPresent()) { LOG.warn("The distDir property was not set. You can set it when running maven via -DdistDir= ."); - return Optional.empty(); + throw new IllegalArgumentException("The distDir property was not set. You can set it when running maven via -DdistDir= ."); } Optional logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); } LOG.info("Created {}.", LocalStandaloneFlinkResource.class.getSimpleName()); - return Optional.of(new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup)); + return new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FactoryUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FactoryUtils.java index 079443c5bd77c6d82ab9e6bd7e771a43c8b38292..4100f259cac545a8ecf27d37d2fa8ed7e80cd22d 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FactoryUtils.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FactoryUtils.java @@ -18,13 +18,10 @@ package org.apache.flink.tests.util.util; +import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.ServiceLoader; -import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * Utilities for factories. @@ -43,24 +40,43 @@ public enum FactoryUtils { * @throws RuntimeException if no or multiple resources could be instantiated * @return created instance */ - public static R loadAndInvokeFactory(final Class factoryInterface, final Function> factoryInvoker, final Supplier defaultProvider) { + public static R loadAndInvokeFactory(final Class factoryInterface, final FactoryInvoker factoryInvoker, final Supplier defaultProvider) { final ServiceLoader factories = ServiceLoader.load(factoryInterface); - final List resources = StreamSupport.stream(factories.spliterator(), false) - .map(factoryInvoker) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); + final List instantiatedResources = new ArrayList<>(); + final List errorsDuringInitialization = new ArrayList<>(); + for (F factory : factories) { + try { + R resource = factoryInvoker.invoke(factory); + instantiatedResources.add(resource); + } catch (Exception e) { + errorsDuringInitialization.add(e); + } + } - if (resources.size() == 1) { - return resources.get(0); + if (instantiatedResources.size() == 1) { + return instantiatedResources.get(0); } - if (resources.isEmpty()) { - return factoryInvoker.apply(defaultProvider.get()) - .orElseThrow(() -> new RuntimeException("Could not instantiate instance using default factory.")); + if (instantiatedResources.isEmpty()) { + try { + return factoryInvoker.invoke(defaultProvider.get()); + } catch (Exception e) { + final RuntimeException exception = new RuntimeException("Could not instantiate any instance."); + final RuntimeException defaultException = new RuntimeException("Could not instantiate default instance.", e); + exception.addSuppressed(defaultException); + errorsDuringInitialization.forEach(exception::addSuppressed); + throw exception; + } } - throw new RuntimeException("Multiple instances were created: " + resources); + throw new RuntimeException("Multiple instances were created: " + instantiatedResources); + } + + /** + * Interface for invoking the factory. + */ + public interface FactoryInvoker { + R invoke(F factory) throws Exception; } } diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java index 68c94981f2730eec7508b4bb4302b4810282f450..4d3179596e95b5b90e59deb5edd6a647e506cc1a 100644 --- a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java +++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java @@ -76,8 +76,7 @@ public class MetricsAvailabilityITCase extends TestLogger { @Rule public final FlinkResource dist = new LocalStandaloneFlinkResourceFactory() - .create(FlinkResourceSetup.builder().build()) - .get(); + .create(FlinkResourceSetup.builder().build()); @Nullable private static ScheduledExecutorService scheduledExecutorService = null; diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java index 5bde088b06395f9db687be39ab08166b75ec04ad..c4207e013bbb89bd6c015373700fc252a0b74227 100644 --- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java @@ -152,7 +152,7 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder(); params.getBuilderSetup().accept(builder); builder.addConfiguration(getFlinkConfig(params.getInstantiationType())); - dist = new LocalStandaloneFlinkResourceFactory().create(builder.build()).get(); + dist = new LocalStandaloneFlinkResourceFactory().create(builder.build()); } @Rule