diff --git a/docs/monitoring/large_state_tuning.md b/docs/monitoring/large_state_tuning.md index 9e1ecc7cb5883c3a9b1deb6a22695f7f672280e7..a356970121cc513d9a547571194eb078befc4b54 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/monitoring/large_state_tuning.md @@ -94,21 +94,11 @@ When a savepoint is manually triggered, it may be in process concurrently with a ## Tuning Network Buffers -The number of network buffers is a parameter that can currently have an effect on checkpointing at large scale. -The Flink community is working on eliminating that parameter in the next versions of Flink. - -The number of network buffers defines how much data a TaskManager can hold in-flight before back-pressure kicks in. -A very high number of network buffers means that a lot of data may be in the stream network channels when a checkpoint -is started. Because the checkpoint barriers travel with that data (see [description of how checkpointing works](../internals/stream_checkpointing.html)), -a lot of in-flight data means that the barriers have to wait for that data to be transported/processed before arriving -at the target operator. - -Having a lot of data in-flight also does not speed up the data processing as a whole. It only means that data is picked up faster -from the data source (log, files, message queue) and buffered longer in Flink. Having fewer network buffers means that -data is picked up from the source more immediately before it is actually being processed, which is generally desirable. -The number of network buffers should hence not be set arbitrarily large, but to a low multiple (such as 2x) of the -minimum number of required buffers. - +Before Flink 1.3, an increased number of network buffers also caused increased checkpointing times since +keeping more in-flight data meant that checkpoint barriers got delayed. Since Flink 1.3, the +number of network buffers used per outgoing/incoming channel is limited and thus network buffers +may be configured without affecting checkpoint times +(see [network buffer configuration](../setup/config.html#configuring-the-network-buffers)). ## Make state checkpointing Asynchronous where possible diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 8480045bbbcea7e9c89faf3a20b0b447f35a823a..bde564a0a88259d51bf93264d679ce9846162097 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -137,7 +137,7 @@ public class TaskManagerOptions { .defaultValue(1024L << 20); // 1 GB /** - * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). + * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). * * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization */ @@ -146,7 +146,7 @@ public class TaskManagerOptions { .defaultValue(2); /** - * Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). + * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index cc4cb77b0be98349264d0e2fbe131c4de6ef9063..4269af640155146251c3bbda6a45756388ccdb39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -73,9 +73,9 @@ public class NetworkEnvironment { private final int partitionRequestMaxBackoff; - /** Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). */ + /** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */ private final int networkBuffersPerChannel; - /** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */ + /** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ private final int extraNetworkBuffersPerGate; private boolean isShutdown;