From 874a4af736c62e2be20b0b73d7b370ebeef36e73 Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Thu, 19 Mar 2020 11:46:46 +0300 Subject: [PATCH] [FLINK-16225] Add JVM Metaspace Error assumption test --- .../flink/test/util/TestProcessBuilder.java | 21 ++- .../runtime/util/ExceptionUtilsITCases.java | 121 ++++++++++++++++++ ...TaskManagerProcessFailureRecoveryTest.java | 2 +- .../ProcessFailureCancelingITCase.java | 4 +- 4 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java index b98f2b09937..2e2d2045c93 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java @@ -70,10 +70,12 @@ public class TestProcessBuilder { commands.addAll(mainClassArgs); StringWriter processOutput = new StringWriter(); + StringWriter errorOutput = new StringWriter(); Process process = new ProcessBuilder(commands).start(); - new PipeForwarder(process.getErrorStream(), processOutput); + new PipeForwarder(process.getInputStream(), processOutput); + new PipeForwarder(process.getErrorStream(), errorOutput); - return new TestProcess(process, processOutput); + return new TestProcess(process, processOutput, errorOutput); } public TestProcessBuilder setJvmMemory(MemorySize jvmMemory) { @@ -81,6 +83,11 @@ public class TestProcessBuilder { return this; } + public TestProcessBuilder addJvmArg(String arg) { + jvmArgs.add(arg); + return this; + } + public TestProcessBuilder addMainClassArg(String arg) { mainClassArgs.add(arg); return this; @@ -100,20 +107,26 @@ public class TestProcessBuilder { public static class TestProcess { private final Process process; private final StringWriter processOutput; + private final StringWriter errorOutput; - public TestProcess(Process process, StringWriter processOutput) { + public TestProcess(Process process, StringWriter processOutput, StringWriter errorOutput) { this.process = process; this.processOutput = processOutput; + this.errorOutput = errorOutput; } public Process getProcess() { return process; } - public StringWriter getOutput() { + public StringWriter getProcessOutput() { return processOutput; } + public StringWriter getErrorOutput() { + return errorOutput; + } + public void destroy() { process.destroy(); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java new file mode 100644 index 00000000000..045babb06c1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.test.util.TestProcessBuilder; +import org.apache.flink.test.util.TestProcessBuilder.TestProcess; +import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.testutils.ClassLoaderUtils.ClassLoaderBuilder; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ExceptionUtils} which require to spawn JVM process and set JVM memory args. + */ +public class ExceptionUtilsITCases extends TestLogger { + private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException { + // load only one class and record required Metaspace + long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE)); + // load more classes to cause 'OutOfMemoryError: Metaspace' + assertThat(run(1000, okMetaspace), is("")); + } + + private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException { + TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName()); + taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops"); + taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize)); + taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses)); + taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath()); + TestProcess p = taskManagerProcessBuilder.start(); + p.getProcess().waitFor(); + assertThat(p.getErrorOutput().toString().trim(), is("")); + return p.getProcessOutput().toString().trim(); + } + + /** + * Dummy java program to generate Metaspace OOM. + */ + public static class DummyClassLoadingProgram { + private DummyClassLoadingProgram() { + } + + public static void main(String[] args) { + // trigger needed classes loaded + output(""); + ExceptionUtils.isMetaspaceOutOfMemoryError(new Exception()); + + Collection> classes = new ArrayList<>(); + int numberOfLoadedClasses = Integer.parseInt(args[0]); + try { + for (int index = 0; index < numberOfLoadedClasses; index++) { + classes.add(loadDummyClass(index, args[1])); + } + String out = classes.size() > 1 ? "Exception is not thrown, metaspace usage: " : ""; + output(out + getMetaspaceUsage()); + } catch (Throwable t) { + if (ExceptionUtils.isMetaspaceOutOfMemoryError(t)) { + return; + } + output("Wrong exception: " + t); + } + } + + private static Class loadDummyClass(int index, String folderToSaveSource) throws ClassNotFoundException, IOException { + String className = "DummyClass" + index; + String sourcePattern = "public class %s { @Override public String toString() { return \"%s\"; } }"; + ClassLoaderBuilder classLoaderBuilder = ClassLoaderUtils.withRoot(new File(folderToSaveSource)); + classLoaderBuilder.addClass(className, String.format(sourcePattern, className, "dummy")); + ClassLoader classLoader = classLoaderBuilder.build(); + return Class.forName(className, true, classLoader); + } + + private static long getMetaspaceUsage() { + for (MemoryPoolMXBean memoryMXBean : ManagementFactory.getMemoryPoolMXBeans()) { + if ("Metaspace".equals(memoryMXBean.getName())) { + return memoryMXBean.getUsage().getUsed(); + } + } + throw new RuntimeException("Metaspace usage is not found"); + } + + private static void output(String text) { + System.out.println(text); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index f69545b886d..d9c82303c0f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -258,7 +258,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test System.out.println("-----------------------------------------"); System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName); System.out.println("-----------------------------------------"); - System.out.println(process.getOutput().toString()); + System.out.println(process.getErrorOutput().toString()); System.out.println("-----------------------------------------"); System.out.println(" END SPAWNED PROCESS LOG"); System.out.println("-----------------------------------------"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 966b2fea841..9d1839511ef 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -237,11 +237,11 @@ public class ProcessFailureCancelingITCase extends TestLogger { // all seems well :-) } catch (Exception e) { - printProcessLog("TaskManager", taskManagerProcess.getOutput().toString()); + printProcessLog("TaskManager", taskManagerProcess.getErrorOutput().toString()); throw e; } catch (Error e) { - printProcessLog("TaskManager 1", taskManagerProcess.getOutput().toString()); + printProcessLog("TaskManager 1", taskManagerProcess.getErrorOutput().toString()); throw e; } finally { -- GitLab