From c57e56f183bd923e6947c70f533a2919c888565b Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Wed, 6 Dec 2017 23:22:50 -0800 Subject: [PATCH] [FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis This closes #5130. --- .../FlinkKinesisConsumerMigrationTest.java | 9 +-- .../kinesis/FlinkKinesisConsumerTest.java | 12 +--- .../kinesis/FlinkKinesisProducerTest.java | 24 +++----- .../kinesis/testutils/TestUtils.java | 39 +++++++++++++ .../kinesis/util/KinesisConfigUtilTest.java | 58 ++++++++----------- 5 files changed, 76 insertions(+), 66 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index 364560c40dd..ab9826e7c48 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; @@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest { private KinesisDataFetcher mockFetcher; - private static Properties dummyConfig = new Properties(); - static { - dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - } + private static Properties dummyConfig = TestUtils.getStandardProperties(); DummyFlinkKinesisConsumer(KinesisDataFetcher mockFetcher) { super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index a354bb34b69..ea6347632d0 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; @@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest { @Test public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception { - Properties config = new Properties(); - config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + Properties config = TestUtils.getStandardProperties(); List> globalUnionState = new ArrayList<>(4); globalUnionState.add(Tuple2.of( @@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest { // ---------------------------------------------------------------------- // setup config, initial state and expected state snapshot // ---------------------------------------------------------------------- - Properties config = new Properties(); - config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + Properties config = TestUtils.getStandardProperties(); ArrayList> initialState = new ArrayList<>(1); initialState.add(Tuple2.of( diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java index 8351f8e092e..07c9cd7db74 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java @@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.ExceptionUtils; @@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; -import java.util.Properties; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided serialization schema is not serializable"); - new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties()); + new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), TestUtils.getStandardProperties()); } @Test public void testCreateWithSerializableDeserializer() { - new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties()); + new FlinkKinesisProducer<>(new SerializableSerializationSchema(), TestUtils.getStandardProperties()); } @Test @@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided custom partitioner is not serializable"); - new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) + new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()) .setCustomPartitioner(new NonSerializableCustomPartitioner()); } @Test public void testConfigureWithSerializableCustomPartitioner() { - new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) + new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()) .setCustomPartitioner(new SerializableCustomPartitioner()); } @Test public void testProducerIsSerializable() { - FlinkKinesisProducer consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()); + FlinkKinesisProducer consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()); assertTrue(InstantiationUtil.isSerializable(consumer)); } @@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest { private boolean isFlushed; DummyFlinkKinesisProducer(SerializationSchema schema) { - super(schema, getStandardProperties()); + super(schema, TestUtils.getStandardProperties()); setDefaultStream(DUMMY_STREAM); setDefaultPartition(DUMMY_PARTITION); @@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest { return numPending; } } - - private static Properties getStandardProperties() { - Properties standardProps = new Properties(); - standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - return standardProps; - } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java new file mode 100644 index 00000000000..f6d0a442cee --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; + +import java.util.Properties; + +/** + * General test utils. + */ +public class TestUtils { + /** + * Get standard Kinesis-related config properties. + */ + public static Properties getStandardProperties() { + Properties config = new Properties(); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + return config; + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index dab6ea21f33..074b6760d2a 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import org.junit.Rule; @@ -171,7 +172,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid AWS Credential Provider Type"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); KinesisConfigUtil.validateAwsConfiguration(testConfig); @@ -186,7 +187,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid initial position in stream"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); @@ -199,7 +200,7 @@ public class KinesisConfigUtilTest { exception.expectMessage("Please set value for initial timestamp ('" + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); @@ -211,7 +212,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate"); @@ -224,7 +225,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0"); @@ -236,7 +237,7 @@ public class KinesisConfigUtilTest { public void testDateStringForValidateOptionDateProperty() { String timestamp = "2016-04-04T19:58:46.480-00:00"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); @@ -253,7 +254,7 @@ public class KinesisConfigUtilTest { public void testUnixTimestampForValidateOptionDateProperty() { String unixTimestamp = "1459799926.480"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); @@ -271,7 +272,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); @@ -285,7 +286,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); @@ -299,7 +300,7 @@ public class KinesisConfigUtilTest { String unixTimestamp = "2016-04-04"; String pattern = "yyyy-MM-dd"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); @@ -318,7 +319,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -329,7 +330,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -340,7 +341,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -351,7 +352,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -362,7 +363,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -373,7 +374,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -384,7 +385,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -395,7 +396,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -406,7 +407,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -417,7 +418,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -428,7 +429,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -439,7 +440,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -450,7 +451,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -461,18 +462,9 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); } - - private Properties getPropertiesWithRequiredFields() { - Properties config = new Properties(); - config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - return config; - } } -- GitLab