提交 e8d1aa57 编写于 作者: P Piotr Nowojski 提交者: Tzu-Li (Gordon) Tai

[FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest...

[FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest stability by using custom in memory storage
上级 102537df
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.ContentDump;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.log4j.AppenderSkeleton;
......@@ -31,17 +32,9 @@ import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
......@@ -54,12 +47,10 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
......@@ -67,18 +58,15 @@ import static org.junit.Assert.fail;
*/
public class TwoPhaseCommitSinkFunctionTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private FileBasedSinkFunction sinkFunction;
private ContentDumpSinkFunction sinkFunction;
private OneInputStreamOperatorTestHarness<String, Object> harness;
private AtomicBoolean throwException = new AtomicBoolean();
private File targetDirectory;
private ContentDump targetDirectory;
private File tmpDirectory;
private ContentDump tmpDirectory;
private SettableClock clock;
......@@ -93,13 +81,22 @@ public class TwoPhaseCommitSinkFunctionTest {
loggingEvents = new ArrayList<>();
setupLogger();
targetDirectory = folder.newFolder("_target");
tmpDirectory = folder.newFolder("_tmp");
targetDirectory = new ContentDump();
tmpDirectory = new ContentDump();
clock = new SettableClock();
setUpTestHarness();
}
@After
public void tearDown() throws Exception {
closeTestHarness();
if (logger != null) {
logger.removeAppender(testAppender);
}
loggingEvents = null;
}
/**
* Setup {@link org.apache.log4j.Logger}, the default logger implementation for tests,
* to append {@link LoggingEvent}s to {@link #loggingEvents} so that we can assert if
......@@ -131,17 +128,8 @@ public class TwoPhaseCommitSinkFunctionTest {
logger.setLevel(Level.WARN);
}
@After
public void tearDown() throws Exception {
closeTestHarness();
if (logger != null) {
logger.removeAppender(testAppender);
}
loggingEvents = null;
}
private void setUpTestHarness() throws Exception {
sinkFunction = new FileBasedSinkFunction();
sinkFunction = new ContentDumpSinkFunction();
harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
harness.setup();
}
......@@ -162,7 +150,7 @@ public class TwoPhaseCommitSinkFunctionTest {
harness.notifyOfCompletedCheckpoint(1);
assertExactlyOnce(Arrays.asList("42", "43"));
assertEquals(2, tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
assertEquals(2, tmpDirectory.listFiles().size()); // one for checkpointId 2 and second for the currentTransaction
}
@Test
......@@ -173,21 +161,20 @@ public class TwoPhaseCommitSinkFunctionTest {
harness.processElement("43", 2);
OperatorStateHandles snapshot = harness.snapshot(1, 3);
assertTrue(tmpDirectory.setWritable(false));
tmpDirectory.setWritable(false);
try {
harness.processElement("44", 4);
harness.snapshot(2, 5);
fail("something should fail");
}
catch (Exception ex) {
if (!(ex.getCause() instanceof FileNotFoundException)) {
} catch (Exception ex) {
if (!(ex.getCause() instanceof ContentDump.NotWritableException)) {
throw ex;
}
// ignore
}
closeTestHarness();
assertTrue(tmpDirectory.setWritable(true));
tmpDirectory.setWritable(true);
setUpTestHarness();
harness.initializeState(snapshot);
......@@ -195,7 +182,7 @@ public class TwoPhaseCommitSinkFunctionTest {
assertExactlyOnce(Arrays.asList("42", "43"));
closeTestHarness();
assertEquals(0, tmpDirectory.listFiles().length);
assertEquals(0, tmpDirectory.listFiles().size());
}
@Test
......@@ -277,88 +264,68 @@ public class TwoPhaseCommitSinkFunctionTest {
private void assertExactlyOnce(List<String> expectedValues) throws IOException {
ArrayList<String> actualValues = new ArrayList<>();
for (File file : targetDirectory.listFiles()) {
actualValues.addAll(Files.readAllLines(file.toPath(), Charset.defaultCharset()));
for (String name : targetDirectory.listFiles()) {
actualValues.addAll(targetDirectory.read(name));
}
Collections.sort(actualValues);
Collections.sort(expectedValues);
assertEquals(expectedValues, actualValues);
}
private class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
private class ContentDumpSinkFunction extends TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {
public FileBasedSinkFunction() {
public ContentDumpSinkFunction() {
super(
new KryoSerializer<>(FileTransaction.class, new ExecutionConfig()),
new KryoSerializer<>(ContentTransaction.class, new ExecutionConfig()),
VoidSerializer.INSTANCE, clock);
if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
throw new IllegalArgumentException();
}
}
@Override
protected void invoke(FileTransaction transaction, String value, Context context) throws Exception {
transaction.writer.write(value);
protected void invoke(ContentTransaction transaction, String value, Context context) throws Exception {
transaction.tmpContentWriter.write(value);
}
@Override
protected FileTransaction beginTransaction() throws Exception {
File tmpFile = new File(tmpDirectory, UUID.randomUUID().toString());
return new FileTransaction(tmpFile);
protected ContentTransaction beginTransaction() throws Exception {
return new ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
}
@Override
protected void preCommit(FileTransaction transaction) throws Exception {
transaction.writer.flush();
transaction.writer.close();
protected void preCommit(ContentTransaction transaction) throws Exception {
transaction.tmpContentWriter.flush();
transaction.tmpContentWriter.close();
}
@Override
protected void commit(FileTransaction transaction) {
protected void commit(ContentTransaction transaction) {
if (throwException.get()) {
throw new RuntimeException("Expected exception");
}
try {
Files.move(
transaction.tmpFile.toPath(),
new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
ATOMIC_MOVE);
} catch (IOException e) {
throw new IllegalStateException(e);
}
ContentDump.move(
transaction.tmpContentWriter.getName(),
tmpDirectory,
targetDirectory);
}
@Override
protected void abort(FileTransaction transaction) {
try {
transaction.writer.close();
} catch (IOException e) {
// ignore
}
transaction.tmpFile.delete();
}
@Override
protected void recoverAndAbort(FileTransaction transaction) {
transaction.tmpFile.delete();
protected void abort(ContentTransaction transaction) {
transaction.tmpContentWriter.close();
tmpDirectory.delete(transaction.tmpContentWriter.getName());
}
}
private static class FileTransaction {
private final File tmpFile;
private final transient BufferedWriter writer;
private static class ContentTransaction {
private ContentDump.ContentWriter tmpContentWriter;
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new FileWriter(tmpFile));
public ContentTransaction(ContentDump.ContentWriter tmpContentWriter) {
this.tmpContentWriter = tmpContentWriter;
}
@Override
public String toString() {
return String.format("FileTransaction[%s]", tmpFile.getName());
return String.format("ContentTransaction[%s]", tmpContentWriter.getName());
}
}
......@@ -399,5 +366,4 @@ public class TwoPhaseCommitSinkFunctionTest {
return Instant.ofEpochMilli(epochMilli);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Utility class to simulate in memory file like writes, flushes and closing.
*/
public class ContentDump {
private boolean writable = true;
private Map<String, List<String>> filesContent = new HashMap<>();
public Set<String> listFiles() {
return new HashSet<>(filesContent.keySet());
}
public void setWritable(boolean writable) {
this.writable = writable;
}
/**
* Creates an empty file.
*/
public ContentWriter createWriter(String name) {
checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name);
filesContent.put(name, new ArrayList<>());
return new ContentWriter(name, this);
}
public static void move(String name, ContentDump source, ContentDump target) {
Collection<String> content = source.read(name);
try (ContentWriter contentWriter = target.createWriter(name)) {
contentWriter.write(content).flush();
}
source.delete(name);
}
public void delete(String name) {
filesContent.remove(name);
}
public Collection<String> read(String name) {
List<String> content = filesContent.get(name);
checkState(content != null, "Unknown file [%s]", name);
List<String> result = new ArrayList<>(content);
return result;
}
private void putContent(String name, List<String> values) {
List<String> content = filesContent.get(name);
checkState(content != null, "Unknown file [%s]", name);
if (!writable) {
throw new NotWritableException(name);
}
content.addAll(values);
}
/**
* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}.
*/
public static class ContentWriter implements AutoCloseable {
private final ContentDump contentDump;
private final String name;
private final List<String> buffer = new ArrayList<>();
private boolean closed = false;
private ContentWriter(String name, ContentDump contentDump) {
this.name = checkNotNull(name);
this.contentDump = checkNotNull(contentDump);
}
public String getName() {
return name;
}
public ContentWriter write(String value) {
checkState(!closed);
buffer.add(value);
return this;
}
public ContentWriter write(Collection<String> values) {
values.forEach(this::write);
return this;
}
public ContentWriter flush() {
contentDump.putContent(name, buffer);
return this;
}
public void close() {
buffer.clear();
closed = true;
}
}
/**
* Exception thrown for an attempt to write into read-only {@link ContentDump}.
*/
public class NotWritableException extends RuntimeException {
public NotWritableException(String name) {
super(String.format("File [%s] is not writable", name));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册