提交 168532ec 编写于 作者: S Stephan Ewen

[hotfix] [streaming] Various cleanups in StreamTask

  - Clean up generics
  - Clean and safe disposal of initialized resources
  - Add names to asynchronous materialization threads
  - Fix concurrent modification of  materialization threads set
上级 bfff86c8
......@@ -17,6 +17,8 @@
package org.apache.flink.streaming.runtime.tasks;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
......@@ -28,13 +30,11 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
......@@ -140,14 +140,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
* actual execution Thread. */
private volatile AsynchronousException asyncException;
protected Set<Thread> asyncCheckpointThreads;
/** The currently active background materialization threads */
private final Set<Thread> asyncCheckpointThreads = Collections.synchronizedSet(new HashSet<Thread>());
/** Flag to mark the task "in operation", in which case check
* needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
private volatile boolean isRunning;
private long recoveryTimestamp;
// ------------------------------------------------------------------------
// Life cycle methods for specific implementations
......@@ -167,21 +167,19 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
@Override
public final void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
LOG.debug("Initializing {}", getName());
boolean initializationCompleted = false;
boolean disposed = false;
try {
AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
// -------- Initialize ---------
LOG.debug("Initializing {}", getName());
userClassLoader = getUserCodeClassLoader();
configuration = new StreamConfig(getTaskConfiguration());
accumulatorMap = accumulatorRegistry.getUserMap();
accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
operatorChain = new OperatorChain<>(this, headOperator,
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
if (headOperator != null) {
headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
......@@ -190,37 +188,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
timerService = Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
asyncCheckpointThreads = new HashSet<>();
// task specific initialization
init();
initializationCompleted = true;
}
finally {
if (!initializationCompleted) {
if (timerService != null) {
timerService.shutdownNow();
}
if (operatorChain != null) {
operatorChain.releaseOutputs();
}
}
}
// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug("Invoking {}", getName());
boolean disposed = false;
try {
// first order of business is to initialize the state backend and to
// give operators back their state
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// first order of business is to give operators back their state
stateBackend = createStateBackend();
stateBackend.initializeForJob(getEnvironment());
restoreStateLazy();
restoreState();
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
......@@ -255,14 +232,31 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
disposed = true;
}
finally {
// clean up everything we initialized
isRunning = false;
timerService.shutdownNow();
for (Thread checkpointThread: asyncCheckpointThreads) {
checkpointThread.interrupt();
// stop all timers and threads
if (timerService != null) {
try {
timerService.shutdownNow();
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down timer service", t);
}
}
// stop all asynchronous checkpoint threads
try {
for (Thread checkpointThread : asyncCheckpointThreads) {
checkpointThread.interrupt();
}
asyncCheckpointThreads.clear();
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down async checkpoint threads", t);
}
asyncCheckpointThreads.clear();
// release the output resources. this method should never fail.
if (operatorChain != null) {
......@@ -270,13 +264,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
// we must! perform this cleanup
try {
cleanup();
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Error during cleanup of stream task.");
LOG.error("Error during cleanup of stream task", t);
}
// if the operators were not disposed before, do a hard dispose
......@@ -333,14 +326,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
private void disposeAllOperators() {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
try {
if (operator != null) {
operator.dispose();
if (operatorChain != null) {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
try {
if (operator != null) {
operator.dispose();
}
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
}
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
}
}
}
......@@ -360,13 +355,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (!timerService.isTerminated()) {
LOG.warn("Timer service was not shut down. Shutting down in finalize().");
}
timerService.shutdown();
timerService.shutdownNow();
}
if (asyncCheckpointThreads != null) {
for (Thread checkpointThread : asyncCheckpointThreads) {
checkpointThread.interrupt();
}
for (Thread checkpointThread : asyncCheckpointThreads) {
checkpointThread.interrupt();
}
}
......@@ -417,7 +410,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
this.recoveryTimestamp = recoveryTimestamp;
}
public void restoreStateLazy() throws Exception {
private void restoreState() throws Exception {
if (lazyRestoreState != null) {
LOG.info("Restoring checkpointed state to task {}", getName());
......@@ -448,7 +441,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
@Override
@SuppressWarnings("unchecked,rawtypes")
public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception {
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
......@@ -497,26 +489,30 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// start a Thread that does the asynchronous materialization and
// then sends the checkpoint acknowledge
Thread checkpointThread = new Thread() {
String threadName = "Materialize checkpoint " + checkpointId + " for " + getName();
Thread checkpointThread = new Thread(threadName) {
@Override
public void run() {
try {
for (StreamTaskState state : states) {
if (state != null) {
if (state.getFunctionState() instanceof AsynchronousStateHandle) {
AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getFunctionState();
state.setFunctionState((StateHandle) asyncState.materialize());
AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
state.setFunctionState(asyncState.materialize());
}
if (state.getOperatorState() instanceof AsynchronousStateHandle) {
AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
state.setOperatorState((StateHandle) asyncState.materialize());
state.setOperatorState(asyncState.materialize());
}
}
}
StreamTaskStateList allStates = new StreamTaskStateList(states);
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
} catch (Exception e) {
LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
}
catch (Exception e) {
if (isRunning()) {
LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
}
if (asyncException == null) {
asyncException = new AsynchronousException(e);
}
......@@ -527,6 +523,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
};
asyncCheckpointThreads.add(checkpointThread);
checkpointThread.setDaemon(true);
checkpointThread.start();
}
return true;
......
......@@ -35,7 +35,6 @@ import java.io.Serializable;
import static org.junit.Assert.assertTrue;
public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册