提交 00e44eda 编写于 作者: S Sachin Goel 提交者: Maximilian Michels

[FLINK-2850] Limit the types of jobs which can run in detached mode

This disallows the following types of interactive programs in detached
mode:

1. More than one call to execute

2. Accessing job execution results like
accumulators, net runtime, etc. This effectively disables eager
execution functions such as count, print, collect, etc. too
上级 b7cf642b
......@@ -40,6 +40,7 @@ import java.util.Properties;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
......@@ -920,7 +921,18 @@ public class CliFrontend {
System.err.println("\n------------------------------------------------------------");
System.err.println(" The program finished with the following exception:\n");
t.printStackTrace();
if (t.getCause() instanceof InvalidProgramException) {
System.err.println(t.getCause().getMessage());
StackTraceElement[] trace = t.getCause().getStackTrace();
for (StackTraceElement ele: trace) {
System.err.println("\t" + ele.toString());
if (ele.getMethodName().equals("main")) {
break;
}
}
} else {
t.printStackTrace();
}
return 1;
}
......
......@@ -242,9 +242,8 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
prog.getUserCodeClassLoader(), parallelism, true);
ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(),
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true));
// invoke here
try {
prog.invokeInteractiveModeForExecution();
......@@ -269,18 +268,18 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
prog.getUserCodeClassLoader(), parallelism, false);
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false);
ContextEnvironment.setAsContext(factory);
// invoke here
try {
prog.invokeInteractiveModeForExecution();
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
finally {
ContextEnvironment.unsetContext();
}
return new JobSubmissionResult(lastJobID);
}
else {
throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
......
......@@ -23,41 +23,30 @@ import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Execution Environment for remote execution with the Client.
* Execution Environment for remote execution with the Client in blocking fashion.
*/
public class ContextEnvironment extends ExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
protected final Client client;
private final Client client;
protected final List<URL> jarFilesToAttach;
private final List<URL> jarFilesToAttach;
private final List<URL> classpathsToAttach;
private final ClassLoader userCodeClassLoader;
private final boolean wait;
protected final List<URL> classpathsToAttach;
protected final ClassLoader userCodeClassLoader;
public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
ClassLoader userCodeClassLoader, boolean wait) {
ClassLoader userCodeClassLoader) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
this.classpathsToAttach = classpaths;
this.userCodeClassLoader = userCodeClassLoader;
this.wait = wait;
}
@Override
......@@ -65,17 +54,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
this.userCodeClassLoader);
if (wait) {
this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
return this.lastJobExecutionResult;
}
else {
JobSubmissionResult result = client.runDetached(toRun, getParallelism());
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
return this.lastJobExecutionResult;
}
this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
return this.lastJobExecutionResult;
}
@Override
......@@ -93,10 +73,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
jobID = JobID.generate();
}
public boolean isWait() {
return wait;
}
@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
......@@ -114,63 +90,18 @@ public class ContextEnvironment extends ExecutionEnvironment {
public List<URL> getClasspaths(){
return classpathsToAttach;
}
public ClassLoader getUserCodeClassLoader() {
return userCodeClassLoader;
}
// --------------------------------------------------------------------------------------------
static void setAsContext(Client client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach,
ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach,
classpathsToAttach, userCodeClassLoader, defaultParallelism, wait);
static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
}
static void unsetContext() {
resetContextEnvironment();
}
// --------------------------------------------------------------------------------------------
/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
* This happens for example when a job is submitted from the command line.
*/
public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
private final Client client;
private final List<URL> jarFilesToAttach;
private final List<URL> classpathsToAttach;
private final ClassLoader userCodeClassLoader;
private final int defaultParallelism;
private final boolean wait;
public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
boolean wait)
{
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
this.classpathsToAttach = classpathsToAttach;
this.userCodeClassLoader = userCodeClassLoader;
this.defaultParallelism = defaultParallelism;
this.wait = wait;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach,
userCodeClassLoader, wait);
if (defaultParallelism > 0) {
env.setParallelism(defaultParallelism);
}
return env;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import java.net.URL;
import java.util.List;
/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
* This happens for example when a job is submitted from the command line.
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
private final Client client;
private final List<URL> jarFilesToAttach;
private final List<URL> classpathsToAttach;
private final ClassLoader userCodeClassLoader;
private final int defaultParallelism;
private final boolean wait;
private ExecutionEnvironment lastEnvCreated;
public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
boolean wait)
{
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
this.classpathsToAttach = classpathsToAttach;
this.userCodeClassLoader = userCodeClassLoader;
this.defaultParallelism = defaultParallelism;
this.wait = wait;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
if (!wait && lastEnvCreated != null) {
throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
}
lastEnvCreated = wait ?
new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) :
new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
if (defaultParallelism > 0) {
lastEnvCreated.setParallelism(defaultParallelism);
}
return lastEnvCreated;
}
public ExecutionEnvironment getLastEnvCreated() {
return lastEnvCreated;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.List;
import java.util.Map;
/**
* Execution Environment for remote execution with the Client in detached mode.
*/
public class DetachedEnvironment extends ContextEnvironment {
/** Keeps track of the program plan for the Client to access. */
private FlinkPlan detachedPlan;
private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
public DetachedEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
super(remoteConnection, jarFiles, classpaths, userCodeClassLoader);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism()));
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE;
return this.lastJobExecutionResult;
}
public void setDetachedPlan(FlinkPlan plan) {
if (detachedPlan == null) {
detachedPlan = plan;
} else {
throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE +
DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
}
}
/**
* Finishes this Context Environment's execution by explicitly running the plan constructed.
*/
JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
}
public static final class DetachedJobExecutionResult extends JobExecutionResult {
public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult();
static final String DETACHED_MESSAGE = "Job was submitted in detached mode. ";
static final String EXECUTE_TWICE_MESSAGE = "Only one call to execute is allowed. ";
static final String EAGER_FUNCTION_MESSAGE = "Please make sure your program doesn't call " +
"an eager execution function [collect, print, printToErr, count]. ";
static final String JOB_RESULT_MESSAGE = "Results of job execution, such as accumulators," +
" runtime, job id etc. are not available. ";
private DetachedJobExecutionResult() {
super(null, -1, null);
}
@Override
public long getNetRuntime() {
throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
}
@Override
public <T> T getAccumulatorResult(String accumulatorName) {
throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE + EAGER_FUNCTION_MESSAGE);
}
@Override
public Map<String, Object> getAllAccumulatorResults() {
throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
}
@Override
public Integer getIntCounterResult(String accumulatorName) {
throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
}
@Override
public JobID getJobID() {
throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
}
}
}
......@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
......@@ -75,6 +76,9 @@ public class ClientTest {
private ActorSystem jobManagerSystem;
private static final String ACCUMULATOR_NAME = "test_accumulator";
private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
@Before
public void setUp() throws Exception {
......@@ -117,6 +121,75 @@ public class ClientTest {
}
}
/**
* Tests that invalid detached mode programs fail.
*/
@Test
public void testDetachedMode() throws Exception{
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
Client out = new Client(config);
try {
PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
e.getCause().getMessage());
}
try {
PackagedProgram prg = new PackagedProgram(TestEager.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
e.getCause().getMessage());
}
try {
PackagedProgram prg = new PackagedProgram(TestGetRuntime.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
e.getCause().getMessage());
}
try {
PackagedProgram prg = new PackagedProgram(TestGetJobID.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
e.getCause().getMessage());
}
try {
PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
e.getCause().getMessage());
}
try {
PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class);
out.runDetached(prg, 1);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
e.getCause().getMessage());
}
}
/**
* This test verifies correct job submission messaging logic and plan translation calls.
*/
......@@ -304,4 +377,58 @@ public class ClientTest {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
}
}
private static final class TestExecuteTwice {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute();
env.fromElements(1, 2).collect();
}
}
private static final class TestEager {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).collect();
}
}
private static final class TestGetRuntime {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getNetRuntime();
}
}
private static final class TestGetJobID {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getJobID();
}
}
private static final class TestGetAccumulator {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
}
}
private static final class TestGetAllAccumulator {
public static void main(String args[]) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAllAccumulatorResults();
}
}
}
......@@ -17,17 +17,13 @@
package org.apache.flink.streaming.api.environment;
import java.net.URL;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,42 +32,19 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
private final List<URL> jars;
private final ContextEnvironment ctx;
private final List<URL> classpaths;
private final Client client;
private final ClassLoader userCodeClassLoader;
private final boolean wait;
protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
boolean wait) {
this.client = client;
this.jars = jars;
this.classpaths = classpaths;
this.wait = wait;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
getClass().getClassLoader());
if (parallelism > 0) {
setParallelism(parallelism);
}
else {
// determine parallelism
protected StreamContextEnvironment(ContextEnvironment ctx) {
this.ctx = ctx;
if (ctx.getParallelism() > 0) {
setParallelism(ctx.getParallelism());
} else {
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
}
@Override
public JobExecutionResult execute() throws Exception {
return execute(DEFAULT_JOB_NAME);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull("Streaming Job name should not be null.");
......@@ -82,12 +55,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
transformations.clear();
// execute the programs
if (wait) {
return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader);
} else {
JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader);
if (ctx instanceof DetachedEnvironment) {
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
return JobExecutionResult.fromJobSubmissionResult(result);
((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
} else {
return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader());
}
}
}
......@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
......@@ -70,7 +69,6 @@ import org.apache.flink.util.SplittableIterator;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -1272,9 +1270,7 @@ public abstract class StreamExecutionEnvironment {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
ContextEnvironment ctx = (ContextEnvironment) env;
return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
ctx.getParallelism(), ctx.isWait());
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
......@@ -1282,12 +1278,6 @@ public abstract class StreamExecutionEnvironment {
}
}
private static StreamExecutionEnvironment createContextEnvironment(
Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean wait)
{
return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册