提交 31e0098a 编写于 作者: T Till Rohrmann

[FLINK-10773] Harden resume externalized checkpoint end-to-end test

Ignore the 'Artificial Failure' exceptions and rename
ExceptionThrowingFailureMapper into FailureMapper to avoid false
positive exception matchings.
上级 742658c2
......@@ -322,6 +322,8 @@ class DataStreamAllroundTestJobFactory {
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
private static final long serialVersionUID = -3154419724891779938L;
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
......@@ -367,8 +369,8 @@ class DataStreamAllroundTestJobFactory {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), TEST_SIMULATE_FAILURE.defaultValue());
}
static MapFunction<Event, Event> createExceptionThrowingFailureMapper(ParameterTool pt) {
return new ExceptionThrowingFailureMapper<>(
static MapFunction<Event, Event> createFailureMapper(ParameterTool pt) {
return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
......
......@@ -41,7 +41,7 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
......@@ -68,7 +68,7 @@ public class DataStreamAllroundTestProgram {
private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME = "TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper";
private static final String FAILURE_MAPPER_NAME = "FailureMapper";
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
......@@ -145,7 +145,7 @@ public class DataStreamAllroundTestProgram {
if (isSimulateFailures(pt)) {
eventStream3 = eventStream3
.map(createExceptionThrowingFailureMapper(pt))
.map(createFailureMapper(pt))
.setParallelism(1)
.name(FAILURE_MAPPER_NAME);
}
......
......@@ -30,7 +30,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
* of the operator can also be configured. Note that this also takes into account
* failures that were not triggered by this mapper, e.g. TaskManager failures.
*/
public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener {
public class FailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener {
private static final long serialVersionUID = -5286927943454740016L;
......@@ -41,7 +41,7 @@ public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> imp
private long numProcessedRecords;
private long numCompleteCheckpoints;
public ExceptionThrowingFailureMapper(
public FailureMapper(
long numProcessedRecordsFailureThreshold,
long numCompleteCheckpointsFailureThreshold,
int maxNumFailures) {
......
......@@ -347,6 +347,7 @@ function check_logs_for_exceptions {
| grep -v "java.lang.Exception: Execution was suspended" \
| grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -v "java.lang.Exception: Artificial failure" \
| grep -ic "exception")
if [[ ${exception_count} -gt 0 ]]; then
echo "Found exception in log files:"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册