提交 e2960949 编写于 作者: S Stephan Ewen

[FLINK-9751] [filesystem] Add PersistentResumableWriter interface.

上级 66e0a271
......@@ -493,6 +493,24 @@ public abstract class FileSystem {
*/
public abstract FSDataInputStream open(Path f) throws IOException;
/**
* Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can
* persist and recover their intermediate state.
* Persisting and recovering intermediate state is a core building block for writing to
* files that span multiple checkpoints.
*
* <p>The returned object can act as a shared factory to open and recover multiple streams.
*
* <p>This method is optional on file systems and various file system implementations may
* not support this method, throwing an {@code UnsupportedOperationException}.
*
* @return A RecoverableWriter for this file system.
* @throws IOException Thrown, if the recoverable writer cannot be instantiated.
*/
public RecoverableWriter createRecoverableWriter() throws IOException {
throw new UnsupportedOperationException("This file system does not support recoverable writers.");
}
/**
* Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import java.io.IOException;
/**
* An output stream to a file system that can be recovered at well defined points.
* The stream initially writes to hidden files or temp files and only creates the
* target file once it is closed and "committed".
*/
public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {
/**
* Ensures all data so far is persistent (similar to {@link #sync()}) and returns
* a handle to recover the stream at the current position.
*/
public abstract ResumeRecoverable persist() throws IOException;
/**
* Closes the stream, ensuring persistence of all data (similar to {@link #sync()}).
* This returns a Committer that can be used to publish (make visible) the file
* that the stream was writing to.
*/
public abstract Committer closeForCommit() throws IOException;
/**
* Closes the stream releasing all local resources, but not finalizing the
* file that the stream writes to.
*
* <p>This method should be understood as "close to dispose on failure".
*/
@Override
public abstract void close() throws IOException;
// ------------------------------------------------------------------------
/**
* A committer can publish the file of a stream that was closed.
* The Committer can be recovered via a {@link CommitRecoverable}.
*/
public interface Committer {
/**
* Commits the file, making it visible. The file will contain the exact data
* as when the committer was created.
*
* @throws IOException Thrown if committing fails.
*/
void commit() throws IOException;
/**
* Commits the file, making it visible. The file will contain the exact data
* as when the committer was created.
*
* <p>This method tolerates situations where the file was already committed and
* will not raise an exception in that case. This is important for idempotent
* commit retries as they need to happen after recovery.
*
* @throws IOException Thrown if committing fails.
*/
void commitAfterRecovery() throws IOException;
/**
* Gets a recoverable object to recover the committer. The recovered committer
* will commit the file with the exact same data as this committer would commit
* it.
*/
CommitRecoverable getRecoverable();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
/**
* The RecoverableWriter creates and recovers {@link RecoverableFsDataOutputStream}.
* It can be used to write data to a file system in a way that the writing can be
* resumed consistently after a failure and recovery without loss of data or possible
* duplication of bytes.
*
* <p>The streams do not make the files they write to immediately visible, but instead write
* to temp files or other temporary storage. To publish the data atomically in the
* end, the stream offers the {@link RecoverableFsDataOutputStream#closeForCommit()} method
* to create a committer that publishes the result.
*
* <p>These writers are useful in the context of checkpointing. The example below illustrates
* how to use them:
*
* <pre>{@code
* // --------- initial run --------
* RecoverableWriter writer = fileSystem.createRecoverableWriter();
* RecoverableFsDataOutputStream out = writer.open(path);
* out.write(...);
*
* // persist intermediate state
* ResumeRecoverable intermediateState = out.persist();
* storeInCheckpoint(intermediateState);
*
* // --------- recovery --------
* ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
* RecoverableWriter writer = fileSystem.createRecoverableWriter();
* RecoverableFsDataOutputStream out = writer.recover(lastCheckpointState);
*
* out.write(...); // append more data
*
* out.closeForCommit().commit(); // close stream and publish all the data
*
* // --------- recovery without appending --------
* ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
* RecoverableWriter writer = fileSystem.createRecoverableWriter();
* Committer committer = writer.recoverForCommit(lastCheckpointState);
* committer.commit(); // publish the state as of the last checkpoint
* }</pre>
*
* <h3>Recovery</h3>
*
* <p>Recovery relies on data persistence in the target file system or object store. While the
* code itself works with the specific primitives that the target storage offers, recovery will
* fail if the data written so far was deleted by an external factor.
* For example, some implementations stage data in temp files or object parts. If these
* were deleted by someone or by an automated cleanup policy, then resuming
* may fail. This is not surprising and should be expected, but we want to explicitly point
* this out here.
*
* <p>Specific care is needed for systems like S3, where the implementation uses Multipart Uploads
* to incrementally upload and persist parts of the result. Timeouts for Multipart Uploads
* and life time of Parts in unfinished Multipart Uploads need to be set in the bucket policy
* high enough to accommodate the recovery. These values are typically in the days, so regular
* recovery is typically not a problem. What can become an issue is situations where a Flink
* application is hard killed (all processes or containers removed) and then one tries to
* manually recover the application from an externalized checkpoint some days later. In that
* case, systems like S3 may have removed uncommitted parts and recovery will not succeed.
*
* <h3>Implementer's Note</h3>
*
* <p>From the perspective of the implementer, it would be desirable to make this class
* generic with respect to the concrete types of 'CommitRecoverable' and 'ResumeRecoverable'.
* However, we found that this makes the code more clumsy to use and we hence dropped the
* generics at the cost of doing some explicit casts in the implementation that would
* otherwise have been implicitly generated by the generics compiler.
*/
public interface RecoverableWriter {
/**
* Opens a new recoverable stream to write to the given path.
* Whether existing files will be overwritten is implementation specific and should
* not be relied upon.
*
* @param path The path of the file/object to write to.
* @return A new RecoverableFsDataOutputStream writing a new file/object.
*
* @throws IOException Thrown if the stream could not be opened/initialized.
*/
RecoverableFsDataOutputStream open(Path path) throws IOException;
/**
* Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable.
* Future writes to the stream will continue / append the file as of that point.
*
* <p>This method is optional and whether it is supported is indicated through the
* {@link #supportsResume()} method.
*
* @param resumable The opaque handle with the recovery information.
* @return A recoverable stream writing to the file/object as it was at the point when the
* ResumeRecoverable was created.
*
* @throws IOException Thrown, if resuming fails.
* @throws UnsupportedOperationException Thrown if this optional method is not supported.
*/
RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
/**
* Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable
* for finalizing and committing. This will publish the target file with exactly the data
* that was written up to the point then the CommitRecoverable was created.
*
* @param resumable The opaque handle with the recovery information.
* @return A committer that publishes the target file.
*
* @throws IOException Thrown, if recovery fails.
*/
RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException;
/**
* The serializer for the CommitRecoverable types created in this writer.
* This serializer should be used to store the CommitRecoverable in checkpoint
* state or other forms of persistent state.
*/
SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
/**
* The serializer for the ResumeRecoverable types created in this writer.
* This serializer should be used to store the ResumeRecoverable in checkpoint
* state or other forms of persistent state.
*/
SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
/**
* Checks whether the writer and its streams support resuming (appending to) files after
* recovery (via the {@link #recover(ResumeRecoverable)} method).
*
* <p>If true, then this writer supports the {@link #recover(ResumeRecoverable)} method.
* If false, then that method may not be supported and streams can only be recovered via
* {@link #recoverForCommit(CommitRecoverable)}.
*/
boolean supportsResume();
// ------------------------------------------------------------------------
/**
* A handle to an in-progress stream with a defined and persistent amount of data.
* The handle can be used to recover the stream as of exactly that point and
* publish the result file.
*/
interface CommitRecoverable {}
/**
* A handle to an in-progress stream with a defined and persistent amount of data.
* The handle can be used to recover the stream exactly as of that point and either
* publish the result file or keep appending data to the stream.
*/
interface ResumeRecoverable extends CommitRecoverable {}
}
......@@ -142,11 +142,9 @@ public class LocalFileSystem extends FileSystem {
return new LocalDataInputStream(file);
}
private File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
@Override
public LocalRecoverableWriter createRecoverableWriter() throws IOException {
return new LocalRecoverableWriter(this);
}
@Override
......@@ -308,6 +306,20 @@ public class LocalFileSystem extends FileSystem {
// ------------------------------------------------------------------------
/**
* Converts the given Path to a File for this file system.
*
* <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
*/
public File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
}
// ------------------------------------------------------------------------
/**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import java.io.File;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the resume and commit descriptor objects for local recoverable streams.
*/
@Internal
class LocalRecoverable implements CommitRecoverable, ResumeRecoverable {
/** The file path for the final result file. */
private final File targetFile;
/** The file path of the staging file. */
private final File tempFile;
/** The position to resume from. */
private final long offset;
/**
* Creates a resumable for the given file at the given position.
*
* @param targetFile The file to resume.
* @param offset The position to resume from.
*/
LocalRecoverable(File targetFile, File tempFile, long offset) {
checkArgument(offset >= 0, "offset must be >= 0");
this.targetFile = checkNotNull(targetFile, "targetFile");
this.tempFile = checkNotNull(tempFile, "tempFile");
this.offset = offset;
}
public File targetFile() {
return targetFile;
}
public File tempFile() {
return tempFile;
}
public long offset() {
return offset;
}
@Override
public String toString() {
return "LocalRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link RecoverableFsDataOutputStream} for the {@link LocalFileSystem}.
*/
@Internal
class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private final File targetFile;
private final File tempFile;
private final FileOutputStream fos;
LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
this.fos = new FileOutputStream(tempFile);
}
LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
this.targetFile = checkNotNull(resumable.targetFile());
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
throw new FileNotFoundException("File Not Found: " + tempFile.getName());
}
if (tempFile.length() < resumable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile.getName());
}
this.fos = new FileOutputStream(this.tempFile, true);
this.fos.getChannel().truncate(resumable.offset());
}
@Override
public void write(int b) throws IOException {
fos.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
fos.write(b, off, len);
}
@Override
public void flush() throws IOException {
fos.flush();
}
@Override
public void sync() throws IOException {
fos.getFD().sync();
}
@Override
public long getPos() throws IOException {
return fos.getChannel().position();
}
@Override
public ResumeRecoverable persist() throws IOException {
// we call both flush and sync in order to ensure persistence on mounted
// file systems, like NFS, EBS, EFS, ...
flush();
sync();
return new LocalRecoverable(targetFile, tempFile, getPos());
}
@Override
public Committer closeForCommit() throws IOException {
final long pos = getPos();
close();
return new LocalCommitter(new LocalRecoverable(targetFile, tempFile, pos));
}
@Override
public void close() throws IOException {
fos.close();
}
// ------------------------------------------------------------------------
static class LocalCommitter implements Committer {
private final LocalRecoverable recoverable;
LocalCommitter(LocalRecoverable recoverable) {
this.recoverable = checkNotNull(recoverable);
}
@Override
public void commit() throws IOException {
final File src = recoverable.tempFile();
final File dest = recoverable.targetFile();
// sanity check
if (src.length() != recoverable.offset()) {
// something was done to this file since the committer was created.
// this is not the "clean" case
throw new IOException("Cannot clean commit: File has trailing junk data.");
}
// rather than fall into default recovery, handle errors explicitly
// in order to improve error messages
try {
Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
catch (UnsupportedOperationException | AtomicMoveNotSupportedException e) {
if (!src.renameTo(dest)) {
throw new IOException("Committing file failed, could not rename " + src + " -> " + dest);
}
}
catch (FileAlreadyExistsException e) {
throw new IOException("Committing file failed. Target file already exists: " + dest);
}
}
@Override
public void commitAfterRecovery() throws IOException {
final File src = recoverable.tempFile();
final File dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
if (src.exists()) {
if (src.length() > expectedLength) {
// can happen if we co from persist to recovering for commit directly
// truncate the trailing junk away
try (FileOutputStream fos = new FileOutputStream(src, true)) {
fos.getChannel().truncate(expectedLength);
}
}
// source still exists, so no renaming happened yet. do it!
Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
else if (!dest.exists()) {
// neither exists - that can be a sign of
// - (1) a serious problem (file system loss of data)
// - (2) a recovery of a savepoint that is some time old and the users
// removed the files in the meantime.
// TODO how to handle this?
// We probably need an option for users whether this should log,
// or result in an exception or unrecoverable exception
}
}
@Override
public CommitRecoverable getRecoverable() {
return recoverable;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* Simple serializer for the {@link LocalRecoverable}.
*/
@Internal
class LocalRecoverableSerializer implements SimpleVersionedSerializer<LocalRecoverable> {
static final LocalRecoverableSerializer INSTANCE = new LocalRecoverableSerializer();
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final int MAGIC_NUMBER = 0x1e744b57;
/**
* Do not instantiate, use reusable {@link #INSTANCE} instead.
*/
private LocalRecoverableSerializer() {}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(LocalRecoverable obj) throws IOException {
final byte[] targetFileBytes = obj.targetFile().getAbsolutePath().getBytes(CHARSET);
final byte[] tempFileBytes = obj.tempFile().getAbsolutePath().getBytes(CHARSET);
final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(MAGIC_NUMBER);
bb.putLong(obj.offset());
bb.putInt(targetFileBytes.length);
bb.putInt(tempFileBytes.length);
bb.put(targetFileBytes);
bb.put(tempFileBytes);
return targetBytes;
}
@Override
public LocalRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
private static LocalRecoverable deserializeV1(byte[] serialized) throws IOException {
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
if (bb.getInt() != MAGIC_NUMBER) {
throw new IOException("Corrupt data: Unexpected magic number.");
}
final long offset = bb.getLong();
final byte[] targetFileBytes = new byte[bb.getInt()];
final byte[] tempFileBytes = new byte[bb.getInt()];
bb.get(targetFileBytes);
bb.get(tempFileBytes);
final String targetPath = new String(targetFileBytes, CHARSET);
final String tempPath = new String(tempFileBytes, CHARSET);
return new LocalRecoverable(new File(targetPath), new File(tempPath), offset);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link RecoverableWriter} for the {@link LocalFileSystem}.
*/
@Internal
public class LocalRecoverableWriter implements RecoverableWriter {
private final LocalFileSystem fs;
public LocalRecoverableWriter(LocalFileSystem fs) {
this.fs = checkNotNull(fs);
}
@Override
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
final File targetFile = fs.pathToFile(filePath);
final File tempFile = generateStagingTempFilePath(targetFile);
// try to create the parent
final File parent = tempFile.getParentFile();
if (parent != null && !parent.mkdirs() && !parent.exists()) {
throw new IOException("Failed to create the parent directory: " + parent);
}
return new LocalRecoverableFsDataOutputStream(targetFile, tempFile);
}
@Override
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
if (recoverable instanceof LocalRecoverable) {
return new LocalRecoverableFsDataOutputStream((LocalRecoverable) recoverable);
}
else {
throw new IllegalArgumentException(
"LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
}
}
@Override
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
if (recoverable instanceof LocalRecoverable) {
return new LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) recoverable);
}
else {
throw new IllegalArgumentException(
"LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
}
}
@Override
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
@SuppressWarnings("unchecked")
SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
(SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
return typedSerializer;
}
@Override
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
@SuppressWarnings("unchecked")
SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
(SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
return typedSerializer;
}
@Override
public boolean supportsResume() {
return true;
}
@VisibleForTesting
static File generateStagingTempFilePath(File targetFile) {
checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
checkArgument(!targetFile.isDirectory(), "targetFile must not be a directory");
final File parent = targetFile.getParentFile();
final String name = targetFile.getName();
checkArgument(parent != null, "targetFile must not be the root directory");
while (true) {
File candidate = new File(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
if (!candidate.exists()) {
return candidate;
}
}
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.net.URI;
......@@ -193,11 +194,19 @@ public class HadoopFileSystem extends FileSystem {
return fsKind;
}
@Override
public RecoverableWriter createRecoverableWriter() throws IOException {
// This writer is only supported on a subset of file systems, and on
// specific versions. We check these schemes and versions eagerly for better error
// messages in the constructor of the writer.
return new HadoopRecoverableWriter(fs);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.hadoop.fs.Path;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the resume and commit descriptor objects for Hadoop's
* file system abstraction.
*/
class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
/** The file path for the final result file. */
private final Path targetFile;
/** The file path of the staging file. */
private final Path tempFile;
/** The position to resume from. */
private final long offset;
/**
* Creates a resumable for the given file at the given position.
*
* @param targetFile The file to resume.
* @param offset The position to resume from.
*/
HadoopFsRecoverable(Path targetFile, Path tempFile, long offset) {
checkArgument(offset >= 0, "offset must be >= 0");
this.targetFile = checkNotNull(targetFile, "targetFile");
this.tempFile = checkNotNull(tempFile, "tempFile");
this.offset = offset;
}
public Path targetFile() {
return targetFile;
}
public Path tempFile() {
return tempFile;
}
public long offset() {
return offset;
}
@Override
public String toString() {
return "HadoopFsRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private static final long LEASE_TIMEOUT = 100000L;
private static Method truncateHandle;
private final FileSystem fs;
private final Path targetFile;
private final Path tempFile;
private final FSDataOutputStream out;
HadoopRecoverableFsDataOutputStream(
FileSystem fs,
Path targetFile,
Path tempFile) throws IOException {
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
this.out = fs.create(tempFile);
}
HadoopRecoverableFsDataOutputStream(
FileSystem fs,
HadoopFsRecoverable recoverable) throws IOException {
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
// the getFileStatus will throw a FileNotFound exception if the file is not there.
final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
if (tmpFileStatus.getLen() < recoverable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile.getName());
}
// truncate back and append
truncate(fs, tempFile, recoverable.offset());
waitUntilLeaseIsRevoked(tempFile);
out = fs.append(tempFile);
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void flush() throws IOException {
out.hflush();
}
@Override
public void sync() throws IOException {
out.hsync();
}
@Override
public long getPos() throws IOException {
return out.getPos();
}
@Override
public ResumeRecoverable persist() throws IOException {
sync();
return new HadoopFsRecoverable(targetFile, tempFile, getPos());
}
@Override
public Committer closeForCommit() throws IOException {
final long pos = getPos();
close();
return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
}
@Override
public void close() throws IOException {
out.close();
}
// ------------------------------------------------------------------------
// Reflection utils for truncation
// These are needed to compile against Hadoop versions before
// Hadoop 2.7, which have no truncation calls for HDFS.
// ------------------------------------------------------------------------
private static void ensureTruncateInitialized() throws FlinkRuntimeException {
if (truncateHandle == null) {
Method truncateMethod;
try {
truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class);
}
catch (NoSuchMethodException e) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
if (!Modifier.isPublic(truncateMethod.getModifiers())) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
truncateHandle = truncateMethod;
}
}
static void truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
if (truncateHandle != null) {
try {
truncateHandle.invoke(hadoopFs, file, length);
}
catch (InvocationTargetException e) {
ExceptionUtils.rethrowIOException(e.getTargetException());
}
catch (Throwable t) {
throw new IOException(
"Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
"This is most likely a dependency conflict or class loading problem.");
}
}
else {
throw new IllegalStateException("Truncation handle has not been initialized");
}
}
// ------------------------------------------------------------------------
// Committer
// ------------------------------------------------------------------------
/**
* Implementation of a committer for the Hadoop File System abstraction.
* This implementation commits by renaming the temp file to the final file path.
* The temp file is truncated before renaming in case there is trailing garbage data.
*/
static class HadoopFsCommitter implements Committer {
private final FileSystem fs;
private final HadoopFsRecoverable recoverable;
HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
this.fs = checkNotNull(fs);
this.recoverable = checkNotNull(recoverable);
}
@Override
public void commit() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
final FileStatus srcStatus;
try {
srcStatus = fs.getFileStatus(src);
}
catch (IOException e) {
throw new IOException("Cannot clean commit: Staging file does not exist.");
}
if (srcStatus.getLen() != expectedLength) {
// something was done to this file since the committer was created.
// this is not the "clean" case
throw new IOException("Cannot clean commit: File has trailing junk data.");
}
try {
fs.rename(src, dest);
}
catch (IOException e) {
throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
@Override
public void commitAfterRecovery() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
FileStatus srcStatus = null;
try {
srcStatus = fs.getFileStatus(src);
}
catch (FileNotFoundException e) {
// status remains null
}
catch (IOException e) {
throw new IOException("Committing during recovery failed: Could not access status of source file.");
}
if (srcStatus != null) {
if (srcStatus.getLen() > expectedLength) {
// can happen if we co from persist to recovering for commit directly
// truncate the trailing junk away
truncate(fs, src, expectedLength);
}
}
else if (!fs.exists(dest)) {
// neither exists - that can be a sign of
// - (1) a serious problem (file system loss of data)
// - (2) a recovery of a savepoint that is some time old and the users
// removed the files in the meantime.
// TODO how to handle this?
// We probably need an option for users whether this should log,
// or result in an exception or unrecoverable exception
}
}
@Override
public CommitRecoverable getRecoverable() {
return recoverable;
}
}
/**
* Called when resuming execution after a failure and waits until the lease
* of the file we are resuming is free.
*
* <p>The lease of the file we are resuming writing/committing to may still
* belong to the process that failed previously and whose state we are
* recovering.
*
* @param path The path to the file we want to resume writing to.
*/
private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
Preconditions.checkState(fs instanceof DistributedFileSystem);
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);
boolean isclosed = dfs.isFileClosed(path);
final StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > LEASE_TIMEOUT) {
break;
}
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
// ignore it
}
isclosed = dfs.isFileClosed(path);
}
return isclosed;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* Simple serializer for the {@link HadoopFsRecoverable}.
*/
@Internal
class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
static final HadoopRecoverableSerializer INSTANCE = new HadoopRecoverableSerializer();
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final int MAGIC_NUMBER = 0xd7436c5e;
/**
* Do not instantiate, use reusable {@link #INSTANCE} instead.
*/
private HadoopRecoverableSerializer() {}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(HadoopFsRecoverable obj) throws IOException {
final byte[] targetFileBytes = obj.targetFile().toString().getBytes(CHARSET);
final byte[] tempFileBytes = obj.tempFile().toString().getBytes(CHARSET);
final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(MAGIC_NUMBER);
bb.putLong(obj.offset());
bb.putInt(targetFileBytes.length);
bb.putInt(tempFileBytes.length);
bb.put(targetFileBytes);
bb.put(tempFileBytes);
return targetBytes;
}
@Override
public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
private static HadoopFsRecoverable deserializeV1(byte[] serialized) throws IOException {
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
if (bb.getInt() != MAGIC_NUMBER) {
throw new IOException("Corrupt data: Unexpected magic number.");
}
final long offset = bb.getLong();
final byte[] targetFileBytes = new byte[bb.getInt()];
final byte[] tempFileBytes = new byte[bb.getInt()];
bb.get(targetFileBytes);
bb.get(tempFileBytes);
final String targetPath = new String(targetFileBytes, CHARSET);
final String tempPath = new String(tempFileBytes, CHARSET);
return new HadoopFsRecoverable(new Path(targetPath), new Path(tempPath), offset);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.util.HadoopUtils;
import java.io.IOException;
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the {@link RecoverableWriter} for
* Hadoop's file system abstraction.
*/
@Internal
public class HadoopRecoverableWriter implements RecoverableWriter {
/** The Hadoop file system on which the writer operates. */
private final org.apache.hadoop.fs.FileSystem fs;
/**
* Creates a new Recoverable writer.
* @param fs The Hadoop file system on which the writer operates.
*/
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
this.fs = checkNotNull(fs);
// This writer is only supported on a subset of file systems, and on
// specific versions. We check these schemes and versions eagerly for
// better error messages.
if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
}
}
@Override
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
}
@Override
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
if (recoverable instanceof HadoopFsRecoverable) {
return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
}
else {
throw new IllegalArgumentException(
"Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
}
}
@Override
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
if (recoverable instanceof HadoopFsRecoverable) {
return new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) recoverable);
}
else {
throw new IllegalArgumentException(
"Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
}
}
@Override
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
@SuppressWarnings("unchecked")
SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
(SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
return typedSerializer;
}
@Override
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
@SuppressWarnings("unchecked")
SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
(SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
return typedSerializer;
}
@Override
public boolean supportsResume() {
return true;
}
@VisibleForTesting
static org.apache.hadoop.fs.Path generateStagingTempFilePath(
org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path targetFile) throws IOException {
checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
final org.apache.hadoop.fs.Path parent = targetFile.getParent();
final String name = targetFile.getName();
checkArgument(parent != null, "targetFile must not be the root directory");
while (true) {
org.apache.hadoop.fs.Path candidate = new org.apache.hadoop.fs.Path(
parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
if (!fs.exists(candidate)) {
return candidate;
}
}
}
}
......@@ -20,12 +20,14 @@ package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -122,4 +124,22 @@ public class HadoopUtils {
}
return false;
}
/**
* Checks if the Hadoop dependency is at least of the given version.
*/
public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException {
String versionString = VersionInfo.getVersion();
String[] versionParts = versionString.split("\\.");
if (versionParts.length < 2) {
throw new FlinkRuntimeException(
"Cannot determine version of Hadoop, unexpected version string: " + versionString);
}
int maj = Integer.parseInt(versionParts[0]);
int min = Integer.parseInt(versionParts[1]);
return maj > major || (maj == major && min >= minor);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册