提交 5558e768 编写于 作者: R Robert Metzger

[FLINK-2996] Introduce configuration parameter for BlobServer port

This closes #1394
上级 d2b4391f
......@@ -218,6 +218,12 @@ Note: State backend must be accessible from the JobManager, use file:// only for
- `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers.
By default the port is set to 0, which means that the operating system is picking an ephemeral port.
Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both.
It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running
on the same machine.
- `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values
have to be specified as strings with a unit.
......
......@@ -81,6 +81,14 @@ public final class ConfigConstants {
*/
public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
/**
* The config parameter defining the server port of the blob service.
* The port can either be a port, such as "9123",
* a range of ports: "50100-50200"
* or a list of ranges and or points: "50100-50200,50300-50400,51234"
*/
public static final String BLOB_SERVER_PORT = "blob.server.port";
/**
* The config parameter defining the cleanup interval of the library cache manager.
*/
......@@ -502,6 +510,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
/**
* Default BLOB server port. 0 means ephemeral port.
*/
public static final String DEFAULT_BLOB_SERVER_PORT = "0";
/**
* The default network port the task manager expects incoming IPC connections. The {@code 0} means that
* the TaskManager searches for a free port.
......
......@@ -29,6 +29,8 @@ import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
public class NetUtils {
......@@ -166,4 +168,32 @@ public class NetUtils {
public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
}
/**
* Returns a set of available ports defined by the range definition.
*
* @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
* @return Set of ports from the range definition
* @throws NumberFormatException If an invalid string is passed.
*/
public static Set<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
Set<Integer> finalSet = new HashSet<>();
final String[] ranges = rangeDefinition.trim().split(",");
for(String rawRange: ranges) {
String range = rawRange.trim();
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
// only one port in range:
finalSet.add(Integer.valueOf(range));
} else {
// evaluate range
int start = Integer.valueOf(range.substring(0, dashIdx));
int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
for(int i = start; i <= end; i++) {
finalSet.add(i);
}
}
}
return finalSet;
}
}
......@@ -18,11 +18,15 @@
package org.apache.flink.util;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import static org.hamcrest.core.IsCollectionContaining.hasItems;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.*;
public class NetUtilsTest {
......@@ -94,4 +98,53 @@ public class NetUtilsTest {
fail(e.getMessage());
}
}
@Test
public void testFreePortRangeUtility() {
// inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range"
String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces
Set<Integer> ports = NetUtils.getPortRangeFromString(rangeDefinition);
Assert.assertEquals(51+101+1, ports.size());
// check first range
Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050));
// check second range and last point
Assert.assertThat(ports, hasItems(50100, 50101, 50110, 50200, 51234));
// check that only ranges are included
Assert.assertThat(ports, not(hasItems(50051, 50052, 1337, 50201, 49999, 50099)));
// test single port "range":
ports = NetUtils.getPortRangeFromString(" 51234");
Assert.assertEquals(1, ports.size());
Assert.assertEquals(51234, (int)ports.iterator().next());
// test port list
ports = NetUtils.getPortRangeFromString("5,1,2,3,4");
Assert.assertEquals(5, ports.size());
Assert.assertThat(ports, hasItems(1,2,3,4,5));
Throwable error = null;
// try some wrong values: String
try { NetUtils.getPortRangeFromString("localhost"); } catch(Throwable t) { error = t; }
Assert.assertTrue(error instanceof NumberFormatException);
error = null;
// incomplete range
try { NetUtils.getPortRangeFromString("5-"); } catch(Throwable t) { error = t; }
Assert.assertTrue(error instanceof NumberFormatException);
error = null;
// incomplete range
try { NetUtils.getPortRangeFromString("-5"); } catch(Throwable t) { error = t; }
Assert.assertTrue(error instanceof NumberFormatException);
error = null;
// empty range
try { NetUtils.getPortRangeFromString(",5"); } catch(Throwable t) { error = t; }
Assert.assertTrue(error instanceof NumberFormatException);
error = null;
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +34,7 @@ import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -137,12 +139,30 @@ public class BlobServer extends Thread implements BlobService {
this.shutdownHook = null;
}
// start the server
try {
this.serverSocket = new ServerSocket(0, backlog);
// ----------------------- start the server -------------------
String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange).iterator();
ServerSocket socketAttempt = null;
while(ports.hasNext()) {
int port = ports.next();
LOG.debug("Trying to open socket on port {}", port);
try {
socketAttempt = new ServerSocket(port, backlog);
break; // we were able to use the port.
} catch (IOException | IllegalArgumentException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Unable to allocate socket on port", e);
} else {
LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
}
}
}
catch (IOException e) {
throw new IOException("Could not create BlobServer with automatic port choice.", e);
if(socketAttempt == null) {
throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
} else {
this.serverSocket = socketAttempt;
}
// start the server thread
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.ServerSocket;
/**
* Tests to ensure that the BlobServer properly starts on a specified range of available ports.
*/
public class BlobServerRangeTest extends TestLogger {
/**
* Start blob server on 0 = pick an ephemeral port
*/
@Test
public void testOnEphemeralPort() throws IOException {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
BlobServer srv = new BlobServer(conf);
}
/**
* Try allocating on an unavailable port
* @throws IOException
*/
@Test(expected = IOException.class)
public void testPortUnavailable() throws IOException {
// allocate on an ephemeral port
ServerSocket socket = null;
try {
socket = new ServerSocket(0);
} catch (IOException e) {
e.printStackTrace();
Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
}
Configuration conf = new Configuration();
conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort()));
// this thing is going to throw an exception
try {
BlobServer srv = new BlobServer(conf);
} finally {
socket.close();
}
}
/**
* Give the BlobServer a choice of three ports, where two of them
* are allocated
*/
@Test
public void testOnePortAvailable() throws IOException {
int numAllocated = 2;
ServerSocket[] sockets = new ServerSocket[numAllocated];
for(int i = 0; i < numAllocated; i++) {
ServerSocket socket = null;
try {
sockets[i] = new ServerSocket(0);
} catch (IOException e) {
e.printStackTrace();
Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
}
}
int availablePort = NetUtils.getAvailablePort();
Configuration conf = new Configuration();
conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
// this thing is going to throw an exception
try {
BlobServer srv = new BlobServer(conf);
Assert.assertEquals(availablePort, srv.getPort());
srv.shutdown();
} finally {
sockets[0].close();
sockets[1].close();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册