提交 62cb954d 编写于 作者: Z zentol

[FLINK-4087] [metrics] Improved JMX port handling

This closes #2145
上级 19ff8db6
......@@ -303,3 +303,18 @@ Open Font License (OFT) - http://scripts.sil.org/OFL
- Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy
-> fonts in "flink-runtime-web/web-dashboard/assets/fonts"
-----------------------------------------------------------------------
The ISC License
-----------------------------------------------------------------------
The Apache Flink project contains code under the ISC license from the following files:
- simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING
ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
USE OR PERFORMANCE OF THIS SOFTWARE.
......@@ -51,6 +51,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -67,6 +68,7 @@ import static org.junit.Assert.assertTrue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class RocksDBAsyncKVSnapshotTest {
......
......@@ -49,6 +49,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -67,6 +68,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class BoltWrapperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
......
......@@ -35,6 +35,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -52,6 +53,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class SpoutWrapperTest extends AbstractTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
......
......@@ -51,9 +51,9 @@ import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@PowerMockIgnore("javax.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class WrapperSetupHelperTest extends AbstractTest {
@Test
......
......@@ -46,6 +46,8 @@ public class MetricRegistry {
// configuration keys
// ------------------------------------------------------------------------
public static final String KEY_METRICS_JMX_PORT = "metrics.jmx.port";
public static final String KEY_METRICS_REPORTER_CLASS = "metrics.reporter.class";
public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
......@@ -87,7 +89,7 @@ public class MetricRegistry {
if (className == null) {
// by default, create JMX metrics
LOG.info("No metrics reporter configured, exposing metrics via JMX");
this.reporter = new JMXReporter();
this.reporter = startJmxReporter(config);
this.executor = null;
}
else {
......@@ -125,9 +127,9 @@ public class MetricRegistry {
}
}
catch (Throwable t) {
reporter = new JMXReporter();
shutdownExecutor();
LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
reporter = startJmxReporter(config);
}
this.reporter = reporter;
......@@ -135,6 +137,23 @@ public class MetricRegistry {
}
}
private static JMXReporter startJmxReporter(Configuration config) {
JMXReporter reporter = null;
try {
Configuration reporterConfig = new Configuration();
String portRange = config.getString(KEY_METRICS_JMX_PORT, null);
if (portRange != null) {
reporterConfig.setString(KEY_METRICS_JMX_PORT, portRange);
}
reporter = new JMXReporter();
reporter.open(reporterConfig);
} catch (Exception e) {
LOG.error("Failed to instantiate JMX reporter.", e);
} finally {
return reporter;
}
}
/**
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
......
......@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +34,22 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.NoSuchObjectException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
*
......@@ -60,6 +72,9 @@ public class JMXReporter implements MetricReporter {
/** The names under which the registered metrics have been added to the MBeanServer */
private final Map<Metric, ObjectName> registeredMetrics;
/** The server to which JMX clients connect to. ALlows for better control over port usage. */
private JMXServer jmxServer;
/**
* Creates a new JMXReporter
*/
......@@ -73,10 +88,42 @@ public class JMXReporter implements MetricReporter {
// ------------------------------------------------------------------------
@Override
public void open(Configuration config) {}
public void open(Configuration config) {
this.jmxServer = startJmxServer(config);
}
private static JMXServer startJmxServer(Configuration config) {
Iterator<Integer> ports = NetUtils.getPortRangeFromString(config.getString(KEY_METRICS_JMX_PORT, "9010-9025"));
JMXServer server = new JMXServer();
while (ports.hasNext()) {
int port = ports.next();
try {
server.start(port);
LOG.info("Started JMX server on port " + port + ".");
return server;
} catch (IOException ioe) { //assume port conflict
LOG.debug("Could not start JMX server on port " + port + ".", ioe);
try {
server.stop();
} catch (Exception e) {
LOG.debug("Could not stop JMX server.", e);
}
}
}
throw new RuntimeException("Could not start JMX server on any configured port.");
}
@Override
public void close() {}
public void close() {
if (jmxServer != null) {
try {
jmxServer.stop();
} catch (IOException e) {
LOG.error("Failed to stop JMX server.", e);
}
}
}
// ------------------------------------------------------------------------
// adding / removing metrics
......@@ -265,4 +312,74 @@ public class JMXReporter implements MetricReporter {
return gauge.getValue();
}
}
/**
* JMX Server implementation that JMX clients can connect to.
*
* Heavily based on j256 simplejmx project
*
* https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
private static class JMXServer {
private Registry rmiRegistry;
private JMXConnectorServer connector;
public void start(int port) throws IOException {
if (rmiRegistry != null && connector != null) {
LOG.debug("JMXServer is already running.");
return;
}
startRmiRegistry(port);
startJmxService(port);
}
/**
* Starts an RMI Registry that allows clients to lookup the JMX IP/port.
*
* @param port rmi port to use
* @throws IOException
*/
private void startRmiRegistry(int port) throws IOException {
rmiRegistry = LocateRegistry.createRegistry(port);
}
/**
* Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
*
* @param port jmx port to use
* @throws IOException
*/
private void startJmxService(int port) throws IOException {
String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
JMXServiceURL url;
try {
url = new JMXServiceURL(serviceUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
}
connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
connector.start();
}
public void stop() throws IOException {
if (connector != null) {
try {
connector.stop();
} finally {
connector = null;
}
}
if (rmiRegistry != null) {
try {
UnicastRemoteObject.unexportObject(rmiRegistry, true);
} catch (NoSuchObjectException e) {
throw new IOException("Could not un-export our RMI registry", e);
} finally {
rmiRegistry = null;
}
}
}
}
}
......@@ -45,6 +45,7 @@ public class JobGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName",
jmGroup.getScopeString());
registry.shutdown();
}
@Test
......@@ -66,6 +67,7 @@ public class JobGroupTest {
assertEquals(
"some-constant.myJobName",
jmGroup.getScopeString());
registry.shutdown();
}
@Test
......@@ -87,5 +89,6 @@ public class JobGroupTest {
assertEquals(
"peter.test-tm-id.some-constant." + jid,
jmGroup.getScopeString());
registry.shutdown();
}
}
......@@ -39,7 +39,9 @@ public class MetricGroupRegistrationTest {
Configuration config = new Configuration();
config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName());
MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
MetricRegistry registry = new MetricRegistry(config);
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
Counter counter = root.counter("counter");
assertEquals(counter, TestReporter1.lastPassedMetric);
......@@ -54,6 +56,8 @@ public class MetricGroupRegistrationTest {
Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
assertEquals("gauge", TestReporter1.lastPassedName);
registry.shutdown();
}
public static class TestReporter1 extends TestReporter {
......@@ -75,8 +79,12 @@ public class MetricGroupRegistrationTest {
public void testInvalidMetricName() {
Configuration config = new Configuration();
MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
MetricRegistry registry = new MetricRegistry(config);
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
root.counter("=)(/!");
registry.shutdown();
}
/**
......@@ -86,7 +94,9 @@ public class MetricGroupRegistrationTest {
public void testDuplicateGroupName() {
Configuration config = new Configuration();
MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id");
MetricRegistry registry = new MetricRegistry(config);
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
MetricGroup group1 = root.addGroup("group");
MetricGroup group2 = root.addGroup("group");
......
......@@ -24,16 +24,30 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class MetricGroupTest {
private final MetricRegistry registry = new MetricRegistry(new Configuration());
private MetricRegistry registry;
private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
@Before
public void createRegistry() {
this.registry = new MetricRegistry(new Configuration());
}
@After
public void shutdownRegistry() {
this.registry.shutdown();
this.registry = null;
}
@Test
public void sameGroupOnNameCollision() {
GenericMetricGroup group = new GenericMetricGroup(
......
......@@ -47,5 +47,7 @@ public class OperatorGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11",
opGroup.getScopeString());
registry.shutdown();
}
}
......@@ -27,6 +27,8 @@ import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
import org.apache.flink.util.AbstractID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
......@@ -37,6 +39,18 @@ public class TaskGroupTest {
// ------------------------------------------------------------------------
// scope tests
// ------------------------------------------------------------------------
private MetricRegistry registry;
@Before
public void createRegistry() {
this.registry = new MetricRegistry(new Configuration());
}
@After
public void shutdownRegistry() {
this.registry.shutdown();
this.registry = null;
}
@Test
public void testGenerateScopeDefault() {
......@@ -56,6 +70,7 @@ public class TaskGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13",
taskGroup.getScopeString());
registry.shutdown();
}
@Test
......@@ -82,6 +97,7 @@ public class TaskGroupTest {
assertEquals(
String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId),
taskGroup.getScopeString());
registry.shutdown();
}
@Test
......@@ -110,5 +126,6 @@ public class TaskGroupTest {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13",
taskGroup.getScopeString());
registry.shutdown();
}
}
......@@ -36,8 +36,10 @@ public class TaskManagerGroupTest {
@Test
public void addAndRemoveJobs() {
MetricRegistry registry = new MetricRegistry(new Configuration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
registry, "localhost", new AbstractID().toString());
final JobID jid1 = new JobID();
......@@ -87,12 +89,15 @@ public class TaskManagerGroupTest {
assertTrue(tmGroup13.parent().isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());
registry.shutdown();
}
@Test
public void testCloseClosesAll() {
MetricRegistry registry = new MetricRegistry(new Configuration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString());
registry, "localhost", new AbstractID().toString());
final JobID jid1 = new JobID();
......@@ -118,6 +123,8 @@ public class TaskManagerGroupTest {
assertTrue(tmGroup11.isClosed());
assertTrue(tmGroup12.isClosed());
assertTrue(tmGroup21.isClosed());
registry.shutdown();
}
// ------------------------------------------------------------------------
......@@ -131,6 +138,7 @@ public class TaskManagerGroupTest {
assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
assertEquals("localhost.taskmanager.id", group.getScopeString());
registry.shutdown();
}
@Test
......@@ -141,5 +149,6 @@ public class TaskManagerGroupTest {
assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
assertEquals("constant.host.foo.host", group.getScopeString());
registry.shutdown();
}
}
......@@ -18,9 +18,24 @@
package org.apache.flink.metrics.reporter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.util.TestReporter;
import org.junit.Test;
import static org.junit.Assert.*;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.lang.management.ManagementFactory;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
import static org.junit.Assert.assertEquals;
public class JMXReporterTest {
......@@ -46,9 +61,131 @@ public class JMXReporterTest {
*/
@Test
public void testGenerateName() {
String[] scope = { "value0", "value1", "\"value2 (test),=;:?'" };
String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
String jmxName = JMXReporter.generateJmxName("TestMetric", scope);
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName);
}
/**
* Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer.
*
* @throws Exception if the attribute/mbean could not be found or the test is broken
*/
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
Configuration cfg1 = new Configuration();
Configuration cfg2 = new Configuration();
rep1.open(cfg1);
rep2.open(cfg2);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
rep2.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
rep1.close();
rep2.close();
reg.shutdown();
}
/**
* Verifies that we can connect to multiple JMXReporters running on the same machine.
*
* @throws Exception
*/
@Test
public void testJMXAvailability() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(KEY_METRICS_REPORTER_CLASS, TestReporter.class.getName());
MetricRegistry reg = new MetricRegistry(cfg);
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
JMXReporter rep1 = new JMXReporter();
JMXReporter rep2 = new JMXReporter();
int port1 = 9010;
int port2 = 9011;
Configuration cfg1 = new Configuration();
cfg1.setString(KEY_METRICS_JMX_PORT, String.valueOf(port1));
Configuration cfg2 = new Configuration();
cfg2.setString(KEY_METRICS_JMX_PORT, String.valueOf(port2));
rep1.open(cfg1);
rep2.open(cfg2);
rep1.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
}, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
rep2.notifyOfAddedMetric(new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
}, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + port1 + "/jndi/rmi://localhost:" + port1 + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
url1 = null;
jmxCon1.close();
jmxCon1 = null;
mCon1 = null;
JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + port2 + "/jndi/rmi://localhost:" + port2 + "/jmxrmi");
JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
url2 = null;
jmxCon2.close();
jmxCon2 = null;
mCon2 = null;
rep1.close();
rep2.close();
reg.shutdown();
}
}
......@@ -107,8 +107,6 @@ KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_RECOVERY_MODE="recovery.mode"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
KEY_METRICS_JMX_PORT="metrics.jmx.port"
########################################################################################################################
# PATHS AND CONFIG
########################################################################################################################
......@@ -258,10 +256,6 @@ if [ -z "${RECOVERY_MODE}" ]; then
RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
fi
if [ -z "${JMX_PORT}" ]; then
JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}")
fi
# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
......
......@@ -23,7 +23,6 @@ USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zook
STARTSTOP=$1
DAEMON=$2
ARGS=("${@:3}") # get remaining arguments as array
JMX_ARGS=""
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
......@@ -33,14 +32,10 @@ bin=`cd "$bin"; pwd`
case $DAEMON in
(jobmanager)
CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
if [ "${ARGS[3]}" == "local" ]; then
JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
fi
;;
(taskmanager)
CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
;;
(zookeeper)
......@@ -101,13 +96,12 @@ case $STARTSTOP in
count="${#active[@]}"
if [ ${count} -gt 0 ]; then
JMX_ARGS=""
echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
fi
fi
echo "Starting $DAEMON daemon on host $HOSTNAME."
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} ${JMX_ARGS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
mypid=$!
......
......@@ -26,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.event.EventListener;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -39,6 +40,7 @@ import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(Task.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("unchecked")
public class BufferReaderTest {
......
......@@ -37,6 +37,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -57,6 +58,7 @@ import static org.junit.Assert.assertFalse;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class DataSinkTaskTest extends TaskTestBase {
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
......
......@@ -44,11 +44,13 @@ import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class DataSourceTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
......
......@@ -46,11 +46,14 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class ChainTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
......
......@@ -66,7 +66,7 @@ import java.util.UUID;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore("javax.management.*")
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
......
......@@ -24,6 +24,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -31,6 +32,7 @@ import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ ResultPartitionWriter.class })
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class StreamIterationHeadTest {
@Test
......
......@@ -35,7 +35,7 @@ import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore("javax.management.*")
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
@Override
protected ListSink createSink() throws Exception {
......
......@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -42,6 +43,7 @@ import static org.junit.Assert.*;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class StreamTaskTimerTest {
......
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class OneInputStreamTaskTest {
/**
......
......@@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class SourceStreamTaskTest {
......
......@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -46,6 +47,7 @@ import static org.junit.Assert.assertTrue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
@SuppressWarnings("serial")
public class StreamTaskAsyncCheckpointTest {
......
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class TwoInputStreamTaskTest {
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册