diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index b4ae3f5d322172c5b5c72c8d0af90b6913da7520..d035c033dbf0104fd80781e4e66a6ca6471ab575 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; @@ -29,6 +31,7 @@ import com.amazonaws.services.kinesis.model.LimitExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -58,9 +61,6 @@ public class KinesisProxy { /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ private final AmazonKinesisClient kinesisClient; - /** The AWS region that this proxy will be making calls to */ - private final String regionId; - /** Configuration properties of this Flink Kinesis Connector */ private final Properties configProps; @@ -72,9 +72,14 @@ public class KinesisProxy { public KinesisProxy(Properties configProps) { this.configProps = checkNotNull(configProps); - this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION); - AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); - client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + /* The AWS region that this proxy will be making calls to */ + String regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION); + // set Flink as a user agent + ClientConfiguration config = new ClientConfigurationFactory().getConfig(); + config.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), config); + + client.setRegion(Region.getRegion(Regions.fromName(regionId))); this.kinesisClient = client; }