提交 637dde88 编写于 作者: B Bowen Li 提交者: Tzu-Li (Gordon) Tai

[FLINK-7508] [kinesis] Switch to POOLED ThreadingMode in FlinkKinesisProducer

KinesisProducerLibrary (KPL) 0.10.x had been using a
One-New-Thread-Per-Request model for all requests sent to AWS Kinesis,
which is very expensive.

0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread
pool. This hugely improves KPL's performance and reduces consumed
resources. By default, KPL still uses per-request mode. We should
explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This closes #4656.
上级 345de772
......@@ -279,6 +279,10 @@ producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
......@@ -301,6 +305,10 @@ producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
kinesis.setFailOnError(true);
......@@ -321,6 +329,11 @@ Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSche
done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
Otherwise, the returned stream name is used.
### Threading Model
Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilizations and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example.
Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
## Using Non-AWS Kinesis Endpoints for Testing
......
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.InstantiationUtil;
......@@ -171,8 +170,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
super.open(parameters);
// check and pass the configuration properties
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = new KinesisProducer(producerConfig);
callback = new FutureCallback<UserRecordResult>() {
......
......@@ -50,10 +50,23 @@ public class KinesisConfigUtil {
* The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw
* RateLimitExceededException too frequently and breaks Flink sink as a result.
**/
private static final String RATE_LIMIT = "RateLimit";
protected static final String RATE_LIMIT = "RateLimit";
/**
* The threading model that KinesisProducer will use.
**/
protected static final String THREADING_MODEL = "ThreadingModel";
/**
* The maximum number of threads that the native process' thread pool will be configured with.
**/
protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
/** Default values for RateLimit. **/
private static final String DEFAULT_RATE_LIMIT = "100";
protected static final String DEFAULT_RATE_LIMIT = "100";
/** Default values for ThreadPoolSize. **/
protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
......@@ -164,9 +177,10 @@ public class KinesisConfigUtil {
}
/**
* Validate configuration properties for {@link FlinkKinesisProducer}.
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
*/
public static KinesisProducerConfiguration validateProducerConfiguration(Properties config) {
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
......@@ -176,7 +190,27 @@ public class KinesisConfigUtil {
config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
}
return KinesisProducerConfiguration.fromProperties(config);
KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
// Because of bug https://github.com/awslabs/amazon-kinesis-producer/issues/124
// KPL cannot set ThreadingModel and ThreadPoolSize using Java reflection
// Thus we have to set them explicitly
if (config.containsKey(THREADING_MODEL)) {
kpc.setThreadingModel(
KinesisProducerConfiguration.ThreadingModel.valueOf(config.getProperty(THREADING_MODEL)));
} else {
kpc.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
}
if (config.containsKey(THREAD_POOL_SIZE)) {
kpc.setThreadPoolSize(Integer.parseInt(config.getProperty(THREAD_POOL_SIZE)));
} else {
kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
}
return kpc;
}
/**
......
......@@ -17,10 +17,11 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -36,7 +37,7 @@ import static org.junit.Assert.assertEquals;
* Tests for KinesisConfigUtil.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
@PrepareForTest(KinesisConfigUtil.class)
public class KinesisConfigUtilTest {
@Rule
private ExpectedException exception = ExpectedException.none();
......@@ -50,7 +51,66 @@ public class KinesisConfigUtilTest {
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.validateProducerConfiguration(testConfig);
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
@Test
public void testDefaultRateLimitInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(100, kpc.getRateLimit());
}
@Test
public void testCustomizedRateLimitInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(150, kpc.getRateLimit());
}
@Test
public void testDefaultThreadingModelInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
}
@Test
public void testCustomizedThreadingModelInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}
@Test
public void testDefaultThreadPoolSizeInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(10, kpc.getThreadPoolSize());
}
@Test
public void testCustomizedThreadPoolSizeInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(12, kpc.getThreadPoolSize());
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册