Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
c57e56f1
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c57e56f1
编写于
12月 06, 2017
作者:
B
Bowen Li
提交者:
Tzu-Li (Gordon) Tai
12月 17, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-8216] [kinesis] Unify test utils in flink-connector-kinesis
This closes #5130.
上级
a7465f04
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
76 addition
and
66 deletion
+76
-66
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
...connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+2
-7
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
...treaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+3
-9
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
...treaming/connectors/kinesis/FlinkKinesisProducerTest.java
+7
-17
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
...ink/streaming/connectors/kinesis/testutils/TestUtils.java
+39
-0
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
...eaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+25
-33
未找到文件。
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
浏览文件 @
c57e56f1
...
...
@@ -22,12 +22,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.functions.source.SourceFunction
;
import
org.apache.flink.streaming.api.operators.StreamSource
;
import
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
;
import
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
;
import
org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata
;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils
;
import
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles
;
import
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
;
import
org.apache.flink.streaming.util.OperatorSnapshotUtil
;
...
...
@@ -213,12 +213,7 @@ public class FlinkKinesisConsumerMigrationTest {
private
KinesisDataFetcher
<
T
>
mockFetcher
;
private
static
Properties
dummyConfig
=
new
Properties
();
static
{
dummyConfig
.
setProperty
(
AWSConfigConstants
.
AWS_REGION
,
"us-east-1"
);
dummyConfig
.
setProperty
(
AWSConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
dummyConfig
.
setProperty
(
AWSConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
}
private
static
Properties
dummyConfig
=
TestUtils
.
getStandardProperties
();
DummyFlinkKinesisConsumer
(
KinesisDataFetcher
<
T
>
mockFetcher
)
{
super
(
"dummy-topic"
,
mock
(
KinesisDeserializationSchema
.
class
),
dummyConfig
);
...
...
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
浏览文件 @
c57e56f1
...
...
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import
org.apache.flink.runtime.state.StateInitializationContext
;
import
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl
;
import
org.apache.flink.streaming.api.functions.source.SourceFunction
;
import
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard
;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState
;
...
...
@@ -42,6 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import
org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata
;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer
;
import
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil
;
...
...
@@ -92,10 +92,7 @@ public class FlinkKinesisConsumerTest {
@Test
public
void
testUseRestoredStateForSnapshotIfFetcherNotInitialized
()
throws
Exception
{
Properties
config
=
new
Properties
();
config
.
setProperty
(
AWSConfigConstants
.
AWS_REGION
,
"us-east-1"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
Properties
config
=
TestUtils
.
getStandardProperties
();
List
<
Tuple2
<
StreamShardMetadata
,
SequenceNumber
>>
globalUnionState
=
new
ArrayList
<>(
4
);
globalUnionState
.
add
(
Tuple2
.
of
(
...
...
@@ -155,10 +152,7 @@ public class FlinkKinesisConsumerTest {
// ----------------------------------------------------------------------
// setup config, initial state and expected state snapshot
// ----------------------------------------------------------------------
Properties
config
=
new
Properties
();
config
.
setProperty
(
AWSConfigConstants
.
AWS_REGION
,
"us-east-1"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
Properties
config
=
TestUtils
.
getStandardProperties
();
ArrayList
<
Tuple2
<
StreamShardMetadata
,
SequenceNumber
>>
initialState
=
new
ArrayList
<>(
1
);
initialState
.
add
(
Tuple2
.
of
(
...
...
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
浏览文件 @
c57e56f1
...
...
@@ -24,8 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread;
import
org.apache.flink.core.testutils.MultiShotLatch
;
import
org.apache.flink.runtime.state.FunctionSnapshotContext
;
import
org.apache.flink.streaming.api.operators.StreamSink
;
import
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils
;
import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
;
import
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
;
import
org.apache.flink.util.ExceptionUtils
;
...
...
@@ -45,7 +45,6 @@ import org.mockito.stubbing.Answer;
import
java.nio.ByteBuffer
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Properties
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
import
static
org
.
mockito
.
Matchers
.
any
;
...
...
@@ -71,12 +70,12 @@ public class FlinkKinesisProducerTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"The provided serialization schema is not serializable"
);
new
FlinkKinesisProducer
<>(
new
NonSerializableSerializationSchema
(),
getStandardProperties
());
new
FlinkKinesisProducer
<>(
new
NonSerializableSerializationSchema
(),
TestUtils
.
getStandardProperties
());
}
@Test
public
void
testCreateWithSerializableDeserializer
()
{
new
FlinkKinesisProducer
<>(
new
SerializableSerializationSchema
(),
getStandardProperties
());
new
FlinkKinesisProducer
<>(
new
SerializableSerializationSchema
(),
TestUtils
.
getStandardProperties
());
}
@Test
...
...
@@ -84,19 +83,19 @@ public class FlinkKinesisProducerTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"The provided custom partitioner is not serializable"
);
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
getStandardProperties
())
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
TestUtils
.
getStandardProperties
())
.
setCustomPartitioner
(
new
NonSerializableCustomPartitioner
());
}
@Test
public
void
testConfigureWithSerializableCustomPartitioner
()
{
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
getStandardProperties
())
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
TestUtils
.
getStandardProperties
())
.
setCustomPartitioner
(
new
SerializableCustomPartitioner
());
}
@Test
public
void
testProducerIsSerializable
()
{
FlinkKinesisProducer
<
String
>
consumer
=
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
getStandardProperties
());
FlinkKinesisProducer
<
String
>
consumer
=
new
FlinkKinesisProducer
<>(
new
SimpleStringSchema
(),
TestUtils
.
getStandardProperties
());
assertTrue
(
InstantiationUtil
.
isSerializable
(
consumer
));
}
...
...
@@ -350,7 +349,7 @@ public class FlinkKinesisProducerTest {
private
boolean
isFlushed
;
DummyFlinkKinesisProducer
(
SerializationSchema
<
T
>
schema
)
{
super
(
schema
,
getStandardProperties
());
super
(
schema
,
TestUtils
.
getStandardProperties
());
setDefaultStream
(
DUMMY_STREAM
);
setDefaultPartition
(
DUMMY_PARTITION
);
...
...
@@ -440,13 +439,4 @@ public class FlinkKinesisProducerTest {
return
numPending
;
}
}
private
static
Properties
getStandardProperties
()
{
Properties
standardProps
=
new
Properties
();
standardProps
.
setProperty
(
AWSConfigConstants
.
AWS_REGION
,
"us-east-1"
);
standardProps
.
setProperty
(
AWSConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
standardProps
.
setProperty
(
AWSConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
return
standardProps
;
}
}
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
0 → 100644
浏览文件 @
c57e56f1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.kinesis.testutils
;
import
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
;
import
java.util.Properties
;
/**
* General test utils.
*/
public
class
TestUtils
{
/**
* Get standard Kinesis-related config properties.
*/
public
static
Properties
getStandardProperties
()
{
Properties
config
=
new
Properties
();
config
.
setProperty
(
AWSConfigConstants
.
AWS_REGION
,
"us-east-1"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
config
.
setProperty
(
AWSConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
return
config
;
}
}
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
浏览文件 @
c57e56f1
...
...
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
import
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants
;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils
;
import
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration
;
import
org.junit.Rule
;
...
...
@@ -171,7 +172,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid AWS Credential Provider Type"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
AWSConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"wrongProviderType"
);
KinesisConfigUtil
.
validateAwsConfiguration
(
testConfig
);
...
...
@@ -186,7 +187,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid initial position in stream"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"wrongInitPosition"
);
...
...
@@ -199,7 +200,7 @@ public class KinesisConfigUtilTest {
exception
.
expectMessage
(
"Please set value for initial timestamp ('"
+
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
+
"') when using AT_TIMESTAMP initial position."
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
...
...
@@ -211,7 +212,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
"unparsableDate"
);
...
...
@@ -224,7 +225,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
"-1.0"
);
...
...
@@ -236,7 +237,7 @@ public class KinesisConfigUtilTest {
public
void
testDateStringForValidateOptionDateProperty
()
{
String
timestamp
=
"2016-04-04T19:58:46.480-00:00"
;
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
timestamp
);
...
...
@@ -253,7 +254,7 @@ public class KinesisConfigUtilTest {
public
void
testUnixTimestampForValidateOptionDateProperty
()
{
String
unixTimestamp
=
"1459799926.480"
;
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
unixTimestamp
);
...
...
@@ -271,7 +272,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
"2016-03-14"
);
...
...
@@ -285,7 +286,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
"stillUnparsable"
);
...
...
@@ -299,7 +300,7 @@ public class KinesisConfigUtilTest {
String
unixTimestamp
=
"2016-04-04"
;
String
pattern
=
"yyyy-MM-dd"
;
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
AWS_CREDENTIALS_PROVIDER
,
"BASIC"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_POSITION
,
"AT_TIMESTAMP"
);
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_INITIAL_TIMESTAMP
,
unixTimestamp
);
...
...
@@ -318,7 +319,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for describe stream operation base backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_DESCRIBE_BACKOFF_BASE
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -329,7 +330,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for describe stream operation max backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_DESCRIBE_BACKOFF_MAX
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -340,7 +341,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for describe stream operation backoff exponential constant"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT
,
"unparsableDouble"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -351,7 +352,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for maximum retry attempts for getRecords shard operation"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_RETRIES
,
"unparsableInt"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -362,7 +363,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for maximum records per getRecords shard operation"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_MAX
,
"unparsableInt"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -373,7 +374,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get records operation base backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_BACKOFF_BASE
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -384,7 +385,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get records operation max backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_BACKOFF_MAX
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -395,7 +396,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get records operation backoff exponential constant"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT
,
"unparsableDouble"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -406,7 +407,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for getRecords sleep interval in milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETRECORDS_INTERVAL_MILLIS
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -417,7 +418,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for maximum retry attempts for getShardIterator shard operation"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETITERATOR_RETRIES
,
"unparsableInt"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -428,7 +429,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get shard iterator operation base backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETITERATOR_BACKOFF_BASE
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -439,7 +440,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get shard iterator operation max backoff milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETITERATOR_BACKOFF_MAX
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -450,7 +451,7 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for get shard iterator operation backoff exponential constant"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT
,
"unparsableDouble"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
...
...
@@ -461,18 +462,9 @@ public class KinesisConfigUtilTest {
exception
.
expect
(
IllegalArgumentException
.
class
);
exception
.
expectMessage
(
"Invalid value given for shard discovery sleep interval in milliseconds"
);
Properties
testConfig
=
getPropertiesWithRequiredField
s
();
Properties
testConfig
=
TestUtils
.
getStandardPropertie
s
();
testConfig
.
setProperty
(
ConsumerConfigConstants
.
SHARD_DISCOVERY_INTERVAL_MILLIS
,
"unparsableLong"
);
KinesisConfigUtil
.
validateConsumerConfiguration
(
testConfig
);
}
private
Properties
getPropertiesWithRequiredFields
()
{
Properties
config
=
new
Properties
();
config
.
setProperty
(
ConsumerConfigConstants
.
AWS_REGION
,
"us-east-1"
);
config
.
setProperty
(
ConsumerConfigConstants
.
AWS_ACCESS_KEY_ID
,
"accessKeyId"
);
config
.
setProperty
(
ConsumerConfigConstants
.
AWS_SECRET_ACCESS_KEY
,
"secretKey"
);
return
config
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录