From 5558e7688e777853a23d94edc31971aa1e230d1e Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Mon, 23 Nov 2015 16:55:50 +0100 Subject: [PATCH] [FLINK-2996] Introduce configuration parameter for BlobServer port This closes #1394 --- docs/setup/config.md | 6 ++ .../flink/configuration/ConfigConstants.java | 13 +++ .../java/org/apache/flink/util/NetUtils.java | 30 ++++++ .../org/apache/flink/util/NetUtilsTest.java | 53 +++++++++ .../apache/flink/runtime/blob/BlobServer.java | 30 +++++- .../runtime/blob/BlobServerRangeTest.java | 102 ++++++++++++++++++ 6 files changed, 229 insertions(+), 5 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java diff --git a/docs/setup/config.md b/docs/setup/config.md index 299b3715ea7..8abcc036d7d 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -218,6 +218,12 @@ Note: State backend must be accessible from the JobManager, use file:// only for - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers. +- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. +By default the port is set to 0, which means that the operating system is picking an ephemeral port. +Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. +It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running +on the same machine. + - `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values have to be specified as strings with a unit. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d33154849a0..251ea9c4d95 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -81,6 +81,14 @@ public final class ConfigConstants { */ public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog"; + /** + * The config parameter defining the server port of the blob service. + * The port can either be a port, such as "9123", + * a range of ports: "50100-50200" + * or a list of ranges and or points: "50100-50200,50300-50400,51234" + */ + public static final String BLOB_SERVER_PORT = "blob.server.port"; + /** * The config parameter defining the cleanup interval of the library cache manager. */ @@ -502,6 +510,11 @@ public final class ConfigConstants { */ public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000; + /** + * Default BLOB server port. 0 means ephemeral port. + */ + public static final String DEFAULT_BLOB_SERVER_PORT = "0"; + /** * The default network port the task manager expects incoming IPC connections. The {@code 0} means that * the TaskManager searches for a free port. diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index da445eceb39..0ba882020f8 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -29,6 +29,8 @@ import java.net.MalformedURLException; import java.net.ServerSocket; import java.net.URL; import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Set; public class NetUtils { @@ -166,4 +168,32 @@ public class NetUtils { public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException { return ipAddressAndPortToUrlString(InetAddress.getByName(host), port); } + + /** + * Returns a set of available ports defined by the range definition. + * + * @param rangeDefinition String describing a single port, a range of ports or multiple ranges. + * @return Set of ports from the range definition + * @throws NumberFormatException If an invalid string is passed. + */ + public static Set getPortRangeFromString(String rangeDefinition) throws NumberFormatException { + Set finalSet = new HashSet<>(); + final String[] ranges = rangeDefinition.trim().split(","); + for(String rawRange: ranges) { + String range = rawRange.trim(); + int dashIdx = range.indexOf('-'); + if (dashIdx == -1) { + // only one port in range: + finalSet.add(Integer.valueOf(range)); + } else { + // evaluate range + int start = Integer.valueOf(range.substring(0, dashIdx)); + int end = Integer.valueOf(range.substring(dashIdx+1, range.length())); + for(int i = start; i <= end; i++) { + finalSet.add(i); + } + } + } + return finalSet; + } } diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java index cd2c13b7e28..13a59fa69a1 100644 --- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java @@ -18,11 +18,15 @@ package org.apache.flink.util; +import org.junit.Assert; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Set; +import static org.hamcrest.core.IsCollectionContaining.hasItems; +import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.*; public class NetUtilsTest { @@ -94,4 +98,53 @@ public class NetUtilsTest { fail(e.getMessage()); } } + + @Test + public void testFreePortRangeUtility() { + // inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range" + String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces + Set ports = NetUtils.getPortRangeFromString(rangeDefinition); + Assert.assertEquals(51+101+1, ports.size()); + // check first range + Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050)); + // check second range and last point + Assert.assertThat(ports, hasItems(50100, 50101, 50110, 50200, 51234)); + // check that only ranges are included + Assert.assertThat(ports, not(hasItems(50051, 50052, 1337, 50201, 49999, 50099))); + + + // test single port "range": + ports = NetUtils.getPortRangeFromString(" 51234"); + Assert.assertEquals(1, ports.size()); + Assert.assertEquals(51234, (int)ports.iterator().next()); + + // test port list + ports = NetUtils.getPortRangeFromString("5,1,2,3,4"); + Assert.assertEquals(5, ports.size()); + Assert.assertThat(ports, hasItems(1,2,3,4,5)); + + + Throwable error = null; + + // try some wrong values: String + try { NetUtils.getPortRangeFromString("localhost"); } catch(Throwable t) { error = t; } + Assert.assertTrue(error instanceof NumberFormatException); + error = null; + + // incomplete range + try { NetUtils.getPortRangeFromString("5-"); } catch(Throwable t) { error = t; } + Assert.assertTrue(error instanceof NumberFormatException); + error = null; + + // incomplete range + try { NetUtils.getPortRangeFromString("-5"); } catch(Throwable t) { error = t; } + Assert.assertTrue(error instanceof NumberFormatException); + error = null; + + // empty range + try { NetUtils.getPortRangeFromString(",5"); } catch(Throwable t) { error = t; } + Assert.assertTrue(error instanceof NumberFormatException); + error = null; + + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 851cff43333..f4b6c0013b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import java.net.ServerSocket; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -137,12 +139,30 @@ public class BlobServer extends Thread implements BlobService { this.shutdownHook = null; } - // start the server - try { - this.serverSocket = new ServerSocket(0, backlog); + // ----------------------- start the server ------------------- + + String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT); + Iterator ports = NetUtils.getPortRangeFromString(serverPortRange).iterator(); + + ServerSocket socketAttempt = null; + while(ports.hasNext()) { + int port = ports.next(); + LOG.debug("Trying to open socket on port {}", port); + try { + socketAttempt = new ServerSocket(port, backlog); + break; // we were able to use the port. + } catch (IOException | IllegalArgumentException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Unable to allocate socket on port", e); + } else { + LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage()); + } + } } - catch (IOException e) { - throw new IOException("Could not create BlobServer with automatic port choice.", e); + if(socketAttempt == null) { + throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange); + } else { + this.serverSocket = socketAttempt; } // start the server thread diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java new file mode 100644 index 00000000000..36ae8ccb5d6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * Tests to ensure that the BlobServer properly starts on a specified range of available ports. + */ +public class BlobServerRangeTest extends TestLogger { + /** + * Start blob server on 0 = pick an ephemeral port + */ + @Test + public void testOnEphemeralPort() throws IOException { + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0"); + BlobServer srv = new BlobServer(conf); + } + + /** + * Try allocating on an unavailable port + * @throws IOException + */ + @Test(expected = IOException.class) + public void testPortUnavailable() throws IOException { + // allocate on an ephemeral port + ServerSocket socket = null; + try { + socket = new ServerSocket(0); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("An exception was thrown while preparing the test " + e.getMessage()); + } + + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort())); + + // this thing is going to throw an exception + try { + BlobServer srv = new BlobServer(conf); + } finally { + socket.close(); + } + } + + /** + * Give the BlobServer a choice of three ports, where two of them + * are allocated + */ + @Test + public void testOnePortAvailable() throws IOException { + int numAllocated = 2; + ServerSocket[] sockets = new ServerSocket[numAllocated]; + for(int i = 0; i < numAllocated; i++) { + ServerSocket socket = null; + try { + sockets[i] = new ServerSocket(0); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("An exception was thrown while preparing the test " + e.getMessage()); + } + } + int availablePort = NetUtils.getAvailablePort(); + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); + + // this thing is going to throw an exception + try { + BlobServer srv = new BlobServer(conf); + Assert.assertEquals(availablePort, srv.getPort()); + srv.shutdown(); + } finally { + sockets[0].close(); + sockets[1].close(); + } + } +} -- GitLab