提交 123be227 编写于 作者: R Robert Metzger

[FLINK-4085][Kinesis] Set Flink-specific user agent

This closes #2175
上级 256c9c4d
......@@ -17,6 +17,8 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
......@@ -29,6 +31,7 @@ import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
......@@ -58,9 +61,6 @@ public class KinesisProxy {
/** The actual Kinesis client from the AWS SDK that we will be using to make calls */
private final AmazonKinesisClient kinesisClient;
/** The AWS region that this proxy will be making calls to */
private final String regionId;
/** Configuration properties of this Flink Kinesis Connector */
private final Properties configProps;
......@@ -72,9 +72,14 @@ public class KinesisProxy {
public KinesisProxy(Properties configProps) {
this.configProps = checkNotNull(configProps);
this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION);
AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
/* The AWS region that this proxy will be making calls to */
String regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION);
// set Flink as a user agent
ClientConfiguration config = new ClientConfigurationFactory().getConfig();
config.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), config);
client.setRegion(Region.getRegion(Regions.fromName(regionId)));
this.kinesisClient = client;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册