diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index 364560c40ddd3349e71b8628f56fe38c7cc018ed..ab9826e7c487afbdb67ecd53a031744fff4ad940 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; @@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest { private KinesisDataFetcher mockFetcher; - private static Properties dummyConfig = new Properties(); - static { - dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - } + private static Properties dummyConfig = TestUtils.getStandardProperties(); DummyFlinkKinesisConsumer(KinesisDataFetcher mockFetcher) { super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index a354bb34b69961be8b49ac40da4b1cf1388e4576..ea6347632d0fb395c9216cd9fb96e9e17524ccd6 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; @@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest { @Test public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception { - Properties config = new Properties(); - config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + Properties config = TestUtils.getStandardProperties(); List> globalUnionState = new ArrayList<>(4); globalUnionState.add(Tuple2.of( @@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest { // ---------------------------------------------------------------------- // setup config, initial state and expected state snapshot // ---------------------------------------------------------------------- - Properties config = new Properties(); - config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + Properties config = TestUtils.getStandardProperties(); ArrayList> initialState = new ArrayList<>(1); initialState.add(Tuple2.of( diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java index 8351f8e092e25ba588f6c2b6059ec981716edfdd..07c9cd7db745519e2c56a44ade50a5a9a8f177b4 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java @@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.ExceptionUtils; @@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; -import java.util.Properties; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided serialization schema is not serializable"); - new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties()); + new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), TestUtils.getStandardProperties()); } @Test public void testCreateWithSerializableDeserializer() { - new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties()); + new FlinkKinesisProducer<>(new SerializableSerializationSchema(), TestUtils.getStandardProperties()); } @Test @@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided custom partitioner is not serializable"); - new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) + new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()) .setCustomPartitioner(new NonSerializableCustomPartitioner()); } @Test public void testConfigureWithSerializableCustomPartitioner() { - new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) + new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()) .setCustomPartitioner(new SerializableCustomPartitioner()); } @Test public void testProducerIsSerializable() { - FlinkKinesisProducer consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()); + FlinkKinesisProducer consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), TestUtils.getStandardProperties()); assertTrue(InstantiationUtil.isSerializable(consumer)); } @@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest { private boolean isFlushed; DummyFlinkKinesisProducer(SerializationSchema schema) { - super(schema, getStandardProperties()); + super(schema, TestUtils.getStandardProperties()); setDefaultStream(DUMMY_STREAM); setDefaultPartition(DUMMY_PARTITION); @@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest { return numPending; } } - - private static Properties getStandardProperties() { - Properties standardProps = new Properties(); - standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - return standardProps; - } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..f6d0a442ceeaee819e0d628e6c96abdc632c60c9 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; + +import java.util.Properties; + +/** + * General test utils. + */ +public class TestUtils { + /** + * Get standard Kinesis-related config properties. + */ + public static Properties getStandardProperties() { + Properties config = new Properties(); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + return config; + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index dab6ea21f33ab22ae9e1d34ba12b1558f8b28a15..074b6760d2a4d93f7d32cdbf80a372f9c3885d63 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import org.junit.Rule; @@ -171,7 +172,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid AWS Credential Provider Type"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); KinesisConfigUtil.validateAwsConfiguration(testConfig); @@ -186,7 +187,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid initial position in stream"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); @@ -199,7 +200,7 @@ public class KinesisConfigUtilTest { exception.expectMessage("Please set value for initial timestamp ('" + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); @@ -211,7 +212,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate"); @@ -224,7 +225,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0"); @@ -236,7 +237,7 @@ public class KinesisConfigUtilTest { public void testDateStringForValidateOptionDateProperty() { String timestamp = "2016-04-04T19:58:46.480-00:00"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); @@ -253,7 +254,7 @@ public class KinesisConfigUtilTest { public void testUnixTimestampForValidateOptionDateProperty() { String unixTimestamp = "1459799926.480"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); @@ -271,7 +272,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); @@ -285,7 +286,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); @@ -299,7 +300,7 @@ public class KinesisConfigUtilTest { String unixTimestamp = "2016-04-04"; String pattern = "yyyy-MM-dd"; - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); @@ -318,7 +319,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -329,7 +330,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -340,7 +341,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -351,7 +352,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -362,7 +363,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -373,7 +374,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -384,7 +385,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -395,7 +396,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -406,7 +407,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -417,7 +418,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -428,7 +429,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -439,7 +440,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -450,7 +451,7 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); @@ -461,18 +462,9 @@ public class KinesisConfigUtilTest { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); - Properties testConfig = getPropertiesWithRequiredFields(); + Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); } - - private Properties getPropertiesWithRequiredFields() { - Properties config = new Properties(); - config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); - config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - return config; - } }