提交 8eadd3ec 编写于 作者: T Till Rohrmann

Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.

上级 c175ebe8
......@@ -19,7 +19,6 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
......
......@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarFile;
import akka.actor.ActorRef;
......@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import scala.concurrent.duration.FiniteDuration;
/**
* All classes in this package contain code taken from
......@@ -328,6 +330,9 @@ public class Client {
LOG.warn("Unable to find job manager port in configuration!");
jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
}
FiniteDuration timeout = new FiniteDuration(GlobalConfiguration.getInteger
(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT),
TimeUnit.SECONDS);
conf = Utils.initializeYarnConfiguration();
......@@ -520,7 +525,6 @@ public class Client {
// file that we write into the conf/ dir containing the jobManager address and the dop.
yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
......@@ -533,7 +537,8 @@ public class Client {
// start application client
LOG.info("Start application client.");
applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort,
yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded));
yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded,
timeout));
actorSystem.awaitTermination();
......
......@@ -35,7 +35,7 @@ import scala.concurrent.duration._
class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
confDirPath: String, slots: Int, numTaskManagers: Int,
dynamicPropertiesEncoded: String)
dynamicPropertiesEncoded: String, timeout: FiniteDuration)
extends Actor with Consumer with ActorLogMessages with ActorLogging {
import context._
......@@ -85,7 +85,7 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
writeYarnProperties(address)
jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address)))
jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
jobManager.get ! RegisterMessageListener
pollingTimer foreach {
......
......@@ -159,7 +159,7 @@ object ApplicationMaster{
val args = Array[String]("--configDir", pathToConfig)
LOG.info(s"Config path: ${pathToConfig}.")
val (hostname, port, configuration) = JobManager.parseArgs(args)
val (hostname, port, configuration, _) = JobManager.parseArgs(args)
implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration)
......
......@@ -60,6 +60,8 @@ public class LocalExecutor extends PlanExecutor {
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
private boolean printStatusDuringExecution = true;
// --------------------------------------------------------------------------------------------
......@@ -82,6 +84,10 @@ public class LocalExecutor extends PlanExecutor {
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
public void setPrintStatusDuringExecution(boolean printStatus) {
this.printStatusDuringExecution = printStatus;
}
// --------------------------------------------------------------------------------------------
......@@ -164,7 +170,7 @@ public class LocalExecutor extends PlanExecutor {
ActorRef jobClient = flink.getJobClient();
return JobClient.submitJobAndWait(jobGraph, true, jobClient);
return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient);
}
finally {
if (shutDownAtEnd) {
......
......@@ -35,7 +35,7 @@ if [ ! -f "$HOSTLIST" ]; then
fi
# cluster mode, bring up job manager locally and a task manager on every slave host
"$FLINK_BIN_DIR"/jobManager.sh start cluster
"$FLINK_BIN_DIR"/jobmanager.sh start cluster
GOON=true
while $GOON
......
......@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# local mode, only bring up job manager. The job manager will start an internal task manager
"$FLINK_BIN_DIR"/jobManager.sh start local
"$FLINK_BIN_DIR"/jobmanager.sh start local
......@@ -46,4 +46,4 @@ do
done < $HOSTLIST
# cluster mode, stop the job manager locally and stop the task manager on every slave host
"$FLINK_BIN_DIR"/jobManager.sh stop
"$FLINK_BIN_DIR"/jobmanager.sh stop
......@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# stop local job manager (has an internal task manager)
"$FLINK_BIN_DIR"/jobManager.sh stop
"$FLINK_BIN_DIR"/jobmanager.sh stop
......@@ -55,7 +55,11 @@ public final class BlobClient implements Closeable {
public BlobClient(final InetSocketAddress serverAddress) throws IOException {
this.socket = new Socket();
this.socket.connect(serverAddress);
try {
this.socket.connect(serverAddress);
}catch(IOException e){
throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
}
}
/**
......
......@@ -101,15 +101,21 @@ public final class BlobServer extends Thread implements BlobService{
*/
public BlobServer() throws IOException {
this.serverSocket = new ServerSocket(0);
start();
try {
this.serverSocket = new ServerSocket(0);
start();
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Started BLOB server on port %d",
this.serverSocket.getLocalPort()));
}
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Started BLOB server on port %d",
this.serverSocket.getLocalPort()));
this.storageDir = BlobUtils.initStorageDirectory();
}catch(IOException e){
throw new IOException("Could not create BlobServer with random port.", e);
}
this.storageDir = BlobUtils.initStorageDirectory();
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
/**
* Implementing this interface allows classes to receive notifications about
* changes of a task's execution state.
*/
public interface ExecutionListener {
void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
ExecutionState newExecutionState, String optionalMessage);
}
......@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
......@@ -97,10 +96,6 @@ public class ExecutionGraph {
private final List<BlobKey> requiredJarFiles;
private final List<JobStatusListener> jobStatusListeners;
private final List<ExecutionListener> executionListeners;
private final List<ActorRef> jobStatusListenerActors;
private final List<ActorRef> executionListenerActors;
......@@ -150,8 +145,6 @@ public class ExecutionGraph {
this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorRef>();
this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
......@@ -638,14 +631,6 @@ public class ExecutionGraph {
// Listeners & Observers
// --------------------------------------------------------------------------------------------
public void registerJobStatusListener(JobStatusListener jobStatusListener) {
this.jobStatusListeners.add(jobStatusListener);
}
public void registerExecutionListener(ExecutionListener executionListener) {
this.executionListeners.add(executionListener);
}
public void registerJobStatusListener(ActorRef listener){
this.jobStatusListenerActors.add(listener);
......@@ -662,20 +647,6 @@ public class ExecutionGraph {
* @param error
*/
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListeners.size() > 0) {
String message = error == null ? null : ExceptionUtils.stringifyException(error);
for (JobStatusListener listener : this.jobStatusListeners) {
try {
listener.jobStatusHasChanged(this, newState, message);
}
catch (Throwable t) {
LOG.error("Notification of job status change caused an error.", t);
}
}
}
if(jobStatusListenerActors.size() > 0){
String message = error == null ? null : ExceptionUtils.stringifyException(error);
for(ActorRef listener: jobStatusListenerActors){
......@@ -696,17 +667,6 @@ public class ExecutionGraph {
*/
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error) {
if(executionListeners.size() >0){
String message = error == null ? null : ExceptionUtils.stringifyException(error);
for (ExecutionListener listener : this.executionListeners) {
try {
listener.executionStateChanged(jobID, vertexId, subtask,executionID, newExecutionState, message);
}catch(Throwable t){
LOG.error("Notification of execution state change caused an error.");
}
}
}
ExecutionJobVertex vertex = getJobVertex(vertexId);
if(executionListenerActors.size() >0){
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
/**
* This interface allows objects to receive notifications when the status of an observed job has changed.
*/
public interface JobStatusListener {
/**
* Called when the status of the job changed.
*
* @param executionGraph The executionGraph representing the job.
* @param newJobStatus The new job status.
* @param optionalMessage An optional message (possibly <code>null</code>) that can be attached to the state change.
*/
void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage);
}
......@@ -29,23 +29,24 @@ import org.apache.flink.runtime.taskmanager.Task;
public interface TaskManagerProfiler {
/**
* Registers an {@link org.apache.flink.runtime.execution.ExecutionListener} object for profiling.
* Registers a {@link org.apache.flink.runtime.taskmanager.Task} object for profiling.
*
* @param task
* task to be register a profiling listener for
* @param jobConfiguration
* the job configuration sent with the task
*/
void registerExecutionListener(Task task, Configuration jobConfiguration);
void registerTask(Task task, Configuration jobConfiguration);
/**
* Unregisters all previously register {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
* the vertex identified by the given ID.
* Unregisters all previously registered {@link org.apache.flink.runtime.taskmanager.Task}
* objects for the vertex identified by the given ID.
*
* @param id
* the ID of the vertex to unregister the {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
* the ID of the vertex to unregister the
* {@link org.apache.flink.runtime.taskmanager.Task} objects for
*/
void unregisterExecutionListener(ExecutionAttemptID id);
void unregisterTask(ExecutionAttemptID id);
/**
* Shuts done the task manager's profiling component
......
......@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
......@@ -62,9 +61,6 @@ public final class Task {
private final String taskName;
private final TaskManager taskManager;
private final List<ExecutionListener> executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
......@@ -354,7 +350,7 @@ public final class Task {
* the configuration attached to the job
*/
public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) {
taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
taskManagerProfiler.registerTask(this, jobConfiguration);
}
/**
......@@ -365,7 +361,7 @@ public final class Task {
*/
public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
if (taskManagerProfiler != null) {
taskManagerProfiler.unregisterExecutionListener(this.executionId);
taskManagerProfiler.unregisterTask(this.executionId);
}
}
......@@ -373,24 +369,10 @@ public final class Task {
// State Listeners
// --------------------------------------------------------------------------------------------
public void registerExecutionListener(ExecutionListener listener) {
if (listener == null) {
throw new IllegalArgumentException();
}
this.executionListeners.add(listener);
}
public void registerExecutionListener(ActorRef listener){
executionListenerActors.add(listener);
}
public void unregisterExecutionListener(ExecutionListener listener) {
if (listener == null) {
throw new IllegalArgumentException();
}
this.executionListeners.remove(listener);
}
public void unregisterExecutionListener(ActorRef listener){
executionListenerActors.remove(listener);
}
......@@ -400,15 +382,6 @@ public final class Task {
LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message));
}
for (ExecutionListener listener : this.executionListeners) {
try {
listener.executionStateChanged(jobId, vertexId, subtaskIndex, executionId, newState, message);
}
catch (Throwable t) {
LOG.error("Error while calling execution listener.", t);
}
}
for(ActorRef listener: executionListenerActors){
listener.tell(new ExecutionGraphMessages.ExecutionStateChanged(
jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex,
......
......@@ -378,10 +378,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val receiver = this.self
val taskName = runningTasks(executionID).getTaskName
val numberOfSubtasks = runningTasks(executionID).getNumberOfSubtasks
val indexOfSubtask = runningTasks(executionID).getSubtaskIndex
futureResponse.mapTo[Boolean].onComplete {
case Success(result) =>
if (!result || executionState == ExecutionState.FINISHED || executionState ==
......@@ -390,7 +386,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
case Failure(t) =>
log.error(t, s"Execution state change notification failed for task ${executionID} " +
s"($indexOfSubtask/$numberOfSubtasks) of job ${jobID}.")
s"of job ${jobID}.")
}
}
......
......@@ -51,6 +51,7 @@ public class LocalExecutorITCase {
LocalExecutor executor = new LocalExecutor();
executor.setDefaultOverwriteFiles(true);
executor.setTaskManagerNumSlots(DOP);
executor.setPrintStatusDuringExecution(false);
executor.start();
executor.executePlan(wc.getPlan(Integer.valueOf(DOP).toString(), inFile.toURI().toString(),
......
......@@ -30,44 +30,46 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Test;
@SuppressWarnings("serial")
public class DeltaIterationNotDependingOnSolutionSetITCase {
public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTestBase {
private final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
@Test
public void testDeltaIterationNotDependingOnSolutionSet() {
@Override
protected void testProgram() throws Exception {
try {
final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(0, 9).map(new Duplicator<Long>());
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 5, 1);
iteration.closeWith(iteration.getWorkset(), iteration.getWorkset().map(new TestMapper()))
.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
env.execute();
boolean[] present = new boolean[50];
for (Tuple2<Long, Long> t : result) {
present[t.f0.intValue()] = true;
}
for (int i = 0; i < present.length; i++) {
assertTrue(String.format("Missing tuple (%d, %d)", i, i), present[i]);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Override
protected void postSubmit() {
boolean[] present = new boolean[50];
for (Tuple2<Long, Long> t : result) {
present[t.f0.intValue()] = true;
}
for (int i = 0; i < present.length; i++) {
assertTrue(String.format("Missing tuple (%d, %d)", i, i), present[i]);
}
}
private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T value) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册