提交 4afca4b3 编写于 作者: F Fabian Hueske

[FLINK-7656] [runtime] Switch to user classloader before calling...

[FLINK-7656] [runtime] Switch to user classloader before calling initializeOnMaster and finalizeOnMaster.

This closes #4690.
上级 d0debf4a
......@@ -73,15 +73,24 @@ public class OutputFormatVertex extends JobVertex {
catch (Throwable t) {
throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
// set user classloader before calling user code
final ClassLoader prevContextCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(loader);
try {
outputFormat.configure(cfg.getStubParameters());
}
catch (Throwable t) {
throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (outputFormat instanceof InitializeOnMaster) {
((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
// configure output format
try {
outputFormat.configure(cfg.getStubParameters());
} catch (Throwable t) {
throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (outputFormat instanceof InitializeOnMaster) {
((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
}
} finally {
// restore previous classloader
Thread.currentThread().setContextClassLoader(prevContextCl);
}
}
......@@ -107,15 +116,24 @@ public class OutputFormatVertex extends JobVertex {
catch (Throwable t) {
throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
// set user classloader before calling user code
final ClassLoader prevContextCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(loader);
try {
outputFormat.configure(cfg.getStubParameters());
}
catch (Throwable t) {
throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (outputFormat instanceof FinalizeOnMaster) {
((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
// configure output format
try {
outputFormat.configure(cfg.getStubParameters());
} catch (Throwable t) {
throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (outputFormat instanceof FinalizeOnMaster) {
((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
}
} finally {
// restore previous classloader
Thread.currentThread().setContextClassLoader(prevContextCl);
}
}
}
......@@ -18,12 +18,14 @@
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
......@@ -33,6 +35,8 @@ import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import static org.junit.Assert.*;
......@@ -84,10 +88,10 @@ public class JobTaskVertexTest {
@Test
public void testOutputFormatVertex() {
try {
final TestingOutputFormat outputFormat = new TestingOutputFormat();
final OutputFormat outputFormat = new TestingOutputFormat();
final OutputFormatVertex of = new OutputFormatVertex("Name");
new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
final ClassLoader cl = getClass().getClassLoader();
final ClassLoader cl = new TestClassLoader();
try {
of.initializeOnMaster(cl);
......@@ -97,19 +101,29 @@ public class JobTaskVertexTest {
}
OutputFormatVertex copy = InstantiationUtil.clone(of);
ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
try {
copy.initializeOnMaster(cl);
fail("Did not throw expected exception.");
} catch (TestException e) {
// all good
}
assertEquals("Previous classloader was not restored.", ctxCl, Thread.currentThread().getContextClassLoader());
try {
copy.finalizeOnMaster(cl);
fail("Did not throw expected exception.");
} catch (TestException e) {
// all good
}
assertEquals("Previous classloader was not restored.", ctxCl, Thread.currentThread().getContextClassLoader());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInputFormatVertex() {
try {
......@@ -134,17 +148,8 @@ public class JobTaskVertexTest {
// --------------------------------------------------------------------------------------------
private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster {
@Override
public void initializeGlobal(int parallelism) throws IOException {
throw new TestException();
}
}
private static final class TestException extends IOException {}
// --------------------------------------------------------------------------------------------
private static final class TestSplit extends GenericInputSplit {
public TestSplit(int partitionNumber, int totalNumberOfPartitions) {
......@@ -169,4 +174,51 @@ public class JobTaskVertexTest {
return new GenericInputSplit[] { new TestSplit(0, 1) };
}
}
private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster, FinalizeOnMaster {
private boolean isConfigured = false;
@Override
public void initializeGlobal(int parallelism) throws IOException {
if (!isConfigured) {
throw new IllegalStateException("OutputFormat was not configured before initializeGlobal was called.");
}
if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
throw new IllegalStateException("Context ClassLoader was not correctly switched.");
}
// notify we have been here.
throw new TestException();
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
if (!isConfigured) {
throw new IllegalStateException("OutputFormat was not configured before finalizeGlobal was called.");
}
if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
throw new IllegalStateException("Context ClassLoader was not correctly switched.");
}
// notify we have been here.
throw new TestException();
}
@Override
public void configure(Configuration parameters) {
if (isConfigured) {
throw new IllegalStateException("OutputFormat is already configured.");
}
if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
throw new IllegalStateException("Context ClassLoader was not correctly switched.");
}
isConfigured = true;
}
}
private static class TestClassLoader extends URLClassLoader {
public TestClassLoader() {
super(new URL[0], Thread.currentThread().getContextClassLoader());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册