提交 9d238e1a 编写于 作者: S Stephan Ewen

[FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers

上级 e2960949
......@@ -57,11 +57,11 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
throw new FileNotFoundException("File Not Found: " + tempFile.getName());
throw new FileNotFoundException("File Not Found: " + tempFile);
}
if (tempFile.length() < resumable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile.getName());
throw new IOException("Missing data in tmp file: " + tempFile);
}
this.fos = new FileOutputStream(this.tempFile, true);
......@@ -165,6 +165,8 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
try (FileOutputStream fos = new FileOutputStream(src, true)) {
fos.getChannel().truncate(expectedLength);
}
} else if (src.length() < expectedLength) {
throw new IOException("Missing data in tmp file: " + src);
}
// source still exists, so no renaming happened yet. do it!
......
......@@ -18,6 +18,8 @@
package org.apache.flink.core.io;
import org.apache.flink.annotation.Internal;
import java.io.IOException;
/**
......@@ -44,6 +46,7 @@ import java.io.IOException;
*
* @param <E> The data type serialized / deserialized by this serializer.
*/
@Internal
public interface SimpleVersionedSerializer<E> extends Versioned {
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.fail;
/**
* A base test-suite for the {@link RecoverableWriter}.
* This should be subclassed to test each filesystem specific writer.
*/
public abstract class AbstractResumableWriterTest extends TestLogger {
private static final Random RND = new Random();
private static final String testData1 = "THIS IS A TEST 1.";
private static final String testData2 = "THIS IS A TEST 2.";
private static final String testData3 = "THIS IS A TEST 3.";
private Path basePathForTest;
private static FileSystem fileSystem;
public abstract Path getBasePath() throws Exception;
public abstract FileSystem initializeFileSystem();
public Path getBasePathForTest() {
return basePathForTest;
}
private FileSystem getFileSystem() {
if (fileSystem == null) {
fileSystem = initializeFileSystem();
}
return fileSystem;
}
private RecoverableWriter getNewFileSystemWriter() throws IOException {
return getFileSystem().createRecoverableWriter();
}
@Before
public void prepare() throws Exception {
basePathForTest = new Path(getBasePath(), randomName());
getFileSystem().mkdirs(basePathForTest);
}
@After
public void cleanup() throws Exception {
getFileSystem().delete(basePathForTest, true);
}
@Test
public void testCloseWithNoData() throws Exception {
final RecoverableWriter writer = getNewFileSystemWriter();
final Path testDir = getBasePathForTest();
final Path path = new Path(testDir, "part-0");
final RecoverableFsDataOutputStream stream = writer.open(path);
for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
Assert.assertTrue(fileContents.getValue().isEmpty());
}
stream.closeForCommit().commit();
for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertTrue(fileContents.getValue().isEmpty());
}
}
@Test
public void testCommitAfterNormalClose() throws Exception {
final RecoverableWriter writer = getNewFileSystemWriter();
final Path testDir = getBasePathForTest();
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(testData1, fileContents.getValue());
}
}
}
@Test
public void testCommitAfterPersist() throws Exception {
final RecoverableWriter writer = getNewFileSystemWriter();
final Path testDir = getBasePathForTest();
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(testData1 + testData2, fileContents.getValue());
}
}
}
// TESTS FOR RECOVERY
private static final String INIT_EMPTY_PERSIST = "EMPTY";
private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
@Test
public void testRecoverWithEmptyState() throws Exception {
testResumeAfterMultiplePersist(
INIT_EMPTY_PERSIST,
"",
testData3);
}
@Test
public void testRecoverWithState() throws Exception {
testResumeAfterMultiplePersist(
INTERM_WITH_STATE_PERSIST,
testData1,
testData1 + testData3);
}
@Test
public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
testResumeAfterMultiplePersist(
INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
testData1,
testData1 + testData3);
}
@Test
public void testRecoverAfterMultiplePersistsState() throws Exception {
testResumeAfterMultiplePersist(
FINAL_WITH_EXTRA_STATE,
testData1 + testData2,
testData1 + testData2 + testData3);
}
private void testResumeAfterMultiplePersist(
final String persistName,
final String expectedPostRecoveryContents,
final String expectedFinalContents) throws Exception {
final Path testDir = getBasePathForTest();
final Path path = new Path(testDir, "part-0");
final RecoverableWriter initWriter = getNewFileSystemWriter();
final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
// and write some more data
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
}
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
final RecoverableWriter newWriter = getNewFileSystemWriter();
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
// we expect the data to be truncated
Map<Path, String> files = getFileContentByPath(testDir);
Assert.assertEquals(1L, files.size());
for (Map.Entry<Path, String> fileContents : files.entrySet()) {
Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue());
}
recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
recoveredStream.closeForCommit().commit();
files = getFileContentByPath(testDir);
Assert.assertEquals(1L, files.size());
for (Map.Entry<Path, String> fileContents : files.entrySet()) {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(expectedFinalContents, fileContents.getValue());
}
}
}
@Test
public void testCommitAfterRecovery() throws Exception {
final Path testDir = getBasePathForTest();
final Path path = new Path(testDir, "part-0");
final RecoverableWriter initWriter = getNewFileSystemWriter();
final RecoverableWriter.CommitRecoverable recoverable;
try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
stream.persist();
// and write some more data
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
recoverable = stream.closeForCommit().getRecoverable();
}
final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
final RecoverableWriter newWriter = getNewFileSystemWriter();
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
committer.commitAfterRecovery();
Map<Path, String> files = getFileContentByPath(testDir);
Assert.assertEquals(1L, files.size());
for (Map.Entry<Path, String> fileContents : files.entrySet()) {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(testData1 + testData2, fileContents.getValue());
}
}
// TESTS FOR EXCEPTIONS
@Test(expected = IOException.class)
public void testExceptionWritingAfterCloseForCommit() throws Exception {
final Path testDir = getBasePathForTest();
final RecoverableWriter writer = getNewFileSystemWriter();
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().getRecoverable();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
fail();
}
}
@Test(expected = IOException.class)
public void testResumeAfterCommit() throws Exception {
final Path testDir = getBasePathForTest();
final RecoverableWriter writer = getNewFileSystemWriter();
final Path path = new Path(testDir, "part-0");
RecoverableWriter.ResumeRecoverable recoverable;
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
recoverable = stream.persist();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
}
// this should throw an exception as the file is already committed
writer.recover(recoverable);
fail();
}
@Test
public void testResumeWithWrongOffset() throws Exception {
// this is a rather unrealistic scenario, but it is to trigger
// truncation of the file and try to resume with missing data.
final Path testDir = getBasePathForTest();
final RecoverableWriter writer = getNewFileSystemWriter();
final Path path = new Path(testDir, "part-0");
final RecoverableWriter.ResumeRecoverable recoverable1;
final RecoverableWriter.ResumeRecoverable recoverable2;
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
recoverable1 = stream.persist();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
recoverable2 = stream.persist();
stream.write(testData3.getBytes(StandardCharsets.UTF_8));
}
try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
// this should work fine
} catch (Exception e) {
fail();
}
// this should throw an exception
try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) {
fail();
} catch (IOException e) {
// we expect this
return;
}
fail();
}
private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
Map<Path, String> contents = new HashMap<>();
final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
for (FileStatus file : filesInBucket) {
final long fileLength = file.getLen();
byte[] serContents = new byte[(int) fileLength];
FSDataInputStream stream = getFileSystem().open(file.getPath());
stream.read(serContents);
contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
}
return contents;
}
private static String randomName() {
return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.core.fs.AbstractResumableWriterTest;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
/**
* Tests for the {@link LocalRecoverableWriter}.
*/
public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@Override
public Path getBasePath() throws Exception {
return new Path(TEMP_FOLDER.newFolder().toURI());
}
@Override
public FileSystem initializeFileSystem() {
return FileSystem.getLocalFileSystem();
}
}
......@@ -19,11 +19,13 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
......@@ -38,13 +40,18 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
* file system abstraction.
*/
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private static final long LEASE_TIMEOUT = 100000L;
private static final long LEASE_TIMEOUT = 100_000L;
private static Method truncateHandle;
......@@ -79,16 +86,23 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
// the getFileStatus will throw a FileNotFound exception if the file is not there.
final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
if (tmpFileStatus.getLen() < recoverable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile.getName());
// truncate back and append
try {
truncate(fs, tempFile, recoverable.offset());
} catch (Exception e) {
throw new IOException("Missing data in tmp file: " + tempFile, e);
}
// truncate back and append
truncate(fs, tempFile, recoverable.offset());
waitUntilLeaseIsRevoked(tempFile);
out = fs.append(tempFile);
// sanity check
long pos = out.getPos();
if (pos != recoverable.offset()) {
IOUtils.closeQuietly(out);
throw new IOException("Truncate failed: " + tempFile +
" (requested=" + recoverable.offset() + " ,size=" + pos + ')');
}
}
@Override
......@@ -108,6 +122,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
@Override
public void sync() throws IOException {
out.hflush();
out.hsync();
}
......@@ -243,9 +258,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
if (srcStatus != null) {
if (srcStatus.getLen() > expectedLength) {
// can happen if we co from persist to recovering for commit directly
// can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
truncate(fs, src, expectedLength);
try {
truncate(fs, src, expectedLength);
} catch (Exception e) {
// this can happen if the file is smaller than expected
throw new IOException("Problem while truncating file: " + src, e);
}
}
// rename to final location (if it exists, overwrite it)
try {
fs.rename(src, dest);
}
catch (IOException e) {
throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
else if (!fs.exists(dest)) {
......@@ -281,23 +309,21 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);
boolean isclosed = dfs.isFileClosed(path);
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
final StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > LEASE_TIMEOUT) {
break;
}
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
// ignore it
throw new IOException("Recovering the lease failed: ", e1);
}
isclosed = dfs.isFileClosed(path);
isClosed = dfs.isFileClosed(path);
}
return isclosed;
return isClosed;
}
}
......@@ -19,8 +19,8 @@
package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.Text;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.AbstractResumableWriterTest;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import java.io.File;
/**
* Tests for the {@link HadoopRecoverableWriter}.
*/
public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
private static MiniDFSCluster hdfsCluster;
/** The cached file system instance. */
private static FileSystem fileSystem;
private static Path basePath;
@BeforeClass
public static void testHadoopVersion() {
Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
}
@BeforeClass
public static void verifyOS() {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
}
@BeforeClass
public static void createHDFS() throws Exception {
final File baseDir = TEMP_FOLDER.newFolder();
final Configuration hdConf = new Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
fileSystem = new HadoopFileSystem(hdfs);
basePath = new Path(hdfs.getUri() + "/tests");
}
@AfterClass
public static void destroyHDFS() throws Exception {
if (hdfsCluster != null) {
hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
hdfsCluster.shutdown();
}
}
@Override
public Path getBasePath() {
return basePath;
}
@Override
public FileSystem initializeFileSystem() {
return fileSystem;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册