提交 59eab454 编写于 作者: T Tzu-Li (Gordon) Tai

[FLINK-7363] [kinesis] Clean up deprecation of ProducerConfigConstants

- Improve deprecation message in Javadocs
- Remove usage of ProducerConfigConstants in code wherever possible
- Remove usage of ProducerConfigConstants in documentation code snippets

This closes #4473.
上级 9ed5d9a1
...@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl ...@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl
{% highlight java %} {% highlight java %}
Properties producerConfig = new Properties(); Properties producerConfig = new Properties();
// Required configs // Required configs
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs // Optional configs
producerConfig.put("AggregationMaxCount", "4294967295"); producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000"); producerConfig.put("CollectionMaxCount", "1000");
...@@ -293,10 +293,10 @@ simpleStringStream.addSink(kinesis); ...@@ -293,10 +293,10 @@ simpleStringStream.addSink(kinesis);
{% highlight scala %} {% highlight scala %}
val producerConfig = new Properties(); val producerConfig = new Properties();
// Required configs // Required configs
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs // Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295"); producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000"); producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000"); producerConfig.put("RecordTtl", "30000");
...@@ -328,29 +328,29 @@ It is sometimes desirable to have Flink operate as a consumer or producer agains ...@@ -328,29 +328,29 @@ It is sometimes desirable to have Flink operate as a consumer or producer agains
[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property. application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the
Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is
required, it will not be used to determine the AWS endpoint URL. required, it will not be used to determine the AWS endpoint URL.
The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property: The following example shows how one might supply the `AWSConfigConstants.AWS_ENDPOINT` configuration property:
<div class="codetabs" markdown="1"> <div class="codetabs" markdown="1">
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
Properties producerConfig = new Properties(); Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val producerConfig = new Properties(); val producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -20,24 +20,31 @@ package org.apache.flink.streaming.connectors.kinesis.config; ...@@ -20,24 +20,31 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
/** /**
* @deprecated
*
* Optional producer specific configuration keys for {@link FlinkKinesisProducer}. * Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
*
* @deprecated This class is deprecated in favor of the official AWS Kinesis producer configuration keys.
* See <a href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">
* here</a> for the full list of available configs.
* For configuring the region and credentials, please use the keys in {@link AWSConfigConstants}.
*/ */
@Deprecated @Deprecated
public class ProducerConfigConstants extends AWSConfigConstants { public class ProducerConfigConstants extends AWSConfigConstants {
/** /**
* @deprecated * Deprecated key.
* *
* Deprecated key. **/ * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
* Please use {@code CollectionMaxCount} instead.
**/
@Deprecated @Deprecated
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
/** /**
* @deprecated * Deprecated key.
* *
* Deprecated key. **/ * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
* Please use {@code AggregationMaxCount} instead.
**/
@Deprecated @Deprecated
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
} }
...@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
...@@ -43,9 +43,9 @@ public class ProduceIntoKinesis { ...@@ -43,9 +43,9 @@ public class ProduceIntoKinesis {
DataStream<String> simpleStringStream = see.addSource(new EventsGenerator()); DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
Properties kinesisProducerConfig = new Properties(); Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new SimpleStringSchema(), kinesisProducerConfig); new SimpleStringSchema(), kinesisProducerConfig);
......
...@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
...@@ -56,9 +56,9 @@ public class ManualConsumerProducerTest { ...@@ -56,9 +56,9 @@ public class ManualConsumerProducerTest {
DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
Properties kinesisProducerConfig = new Properties(); Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new KinesisSerializationSchema<String>() { new KinesisSerializationSchema<String>() {
......
...@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
...@@ -53,9 +53,9 @@ public class ManualProducerTest { ...@@ -53,9 +53,9 @@ public class ManualProducerTest {
DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
Properties kinesisProducerConfig = new Properties(); Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new KinesisSerializationSchema<String>() { new KinesisSerializationSchema<String>() {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.util; package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.junit.Rule; import org.junit.Rule;
...@@ -46,7 +47,7 @@ public class KinesisConfigUtilTest { ...@@ -46,7 +47,7 @@ public class KinesisConfigUtilTest {
exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'"); exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
Properties testConfig = new Properties(); Properties testConfig = new Properties();
testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong"); testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.validateProducerConfiguration(testConfig); KinesisConfigUtil.validateProducerConfiguration(testConfig);
...@@ -55,7 +56,8 @@ public class KinesisConfigUtilTest { ...@@ -55,7 +56,8 @@ public class KinesisConfigUtilTest {
@Test @Test
public void testReplaceDeprecatedKeys() { public void testReplaceDeprecatedKeys() {
Properties testConfig = new Properties(); Properties testConfig = new Properties();
testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
// these deprecated keys should be replaced
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1"); testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2"); testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig); Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册