提交 68f41a0e 编写于 作者: S Stephan Ewen

[streaming] Fixed error logging in streaming tasks.

上级 f27025ba
......@@ -109,21 +109,22 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
LOG.debug("Task {} invocation finished", getName());
}
} catch (Exception e) {
}
catch (Exception e) {
LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
if (operatorOpen) {
try {
closeOperator();
} catch (Throwable t) {
LOG.info("Caught exception while closing operator.", e);
}
catch (Throwable t) {
LOG.warn("Exception while closing operator.", t);
}
}
if (LOG.isErrorEnabled()) {
LOG.error("StreamOperator failed.", e);
}
throw e;
} finally {
}
finally {
this.isRunning = false;
// Cleanup
inputs.clearBuffers();
......
......@@ -49,21 +49,21 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
LOG.debug("Task {} invocation finished", getName());
}
} catch (Exception e) {
}
catch (Exception e) {
LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
if (operatorOpen) {
try {
closeOperator();
} catch (Throwable t) {
LOG.info("Caught exception while closing operator.", e);
}
}
if (LOG.isErrorEnabled()) {
LOG.error("StreamOperator failed.", e);
catch (Throwable t) {
LOG.warn("Exception while closing operator.", t);
}
}
throw e;
} finally {
}
finally {
this.isRunning = false;
// Cleanup
outputHandler.flushOutputs();
......
......@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -88,16 +88,16 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
}
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
}
}
catch (Exception e) {
LOG.error("Iteration Head " + getEnvironment().getTaskNameWithSubtasks() + " failed", e);
throw e;
} finally {
}
finally {
// Cleanup
outputHandler.flushOutputs();
clearBuffers();
}
}
}
......@@ -67,12 +67,12 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration sink {} invoke finished", getName());
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
}
}
catch (Exception e) {
LOG.error("Iteration tail " + getEnvironment().getTaskNameWithSubtasks() + " failed", e);
throw e;
} finally {
}
finally {
// Cleanup
clearBuffers();
}
......
......@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.io.CoRecordReader;
import org.apache.flink.streaming.runtime.io.InputGateFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -99,21 +100,22 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
LOG.debug("Task {} invocation finished", getName());
}
} catch (Exception e) {
}
catch (Exception e) {
LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
if (operatorOpen) {
try {
closeOperator();
} catch (Throwable t) {
LOG.info("Caught exception while closing operator.", e);
}
catch (Throwable t) {
LOG.warn("Exception while closing operator.", t);
}
}
if (LOG.isErrorEnabled()) {
LOG.error("StreamOperator failed. ", e);
}
throw e;
} finally {
}
finally {
this.isRunning = false;
// Cleanup
outputHandler.flushOutputs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册