提交 23ea197b 编写于 作者: S Stephan Ewen

[hotfix] [core] Fix remaining checkstyle issues for 'core.fs'

上级 b051a4c8
......@@ -229,7 +229,9 @@ public abstract class FileSystem {
/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
//CHECKSTYLE.OFF: StaticVariableName
private static URI DEFAULT_SCHEME;
//CHECKSTYLE.ON: StaticVariableName
// ------------------------------------------------------------------------
// Initialization
......
......@@ -30,22 +30,22 @@ import static org.apache.flink.util.Preconditions.checkState;
* When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
* obtains. The safety net has a global cleanup hook that will close all streams that were
* not properly closed.
*
*
* <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
* by this safety net.
*
*
* <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
* i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
* {@link Path#getFileSystem()}.
*
*
* <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
* to another thread, the safety net will close those resources once the former thread finishes.
*
*
* <p>The safety net can be used as follows:
* <pre>{@code
*
*
* class GuardedThread extends Thread {
*
*
* public void run() {
* FileSystemSafetyNet.initializeSafetyNetForThread();
* try {
......@@ -62,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public class FileSystemSafetyNet {
/** The map from thread to the safety net registry for that thread */
/** The map from thread to the safety net registry for that thread. */
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
// ------------------------------------------------------------------------
......@@ -73,9 +73,9 @@ public class FileSystemSafetyNet {
* Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
* that called this method will be guarded, meaning that their created streams are tracked and can
* be closed via the safety net closing hook.
*
*
* <p>This method should be called at the beginning of a thread that should be guarded.
*
*
* @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
*/
@Internal
......@@ -94,7 +94,7 @@ public class FileSystemSafetyNet {
* Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
* by safety-net-guarded file systems. After this method was called, no streams can be opened any more
* from any FileSystem instance that was obtained while the thread was guarded by the safety net.
*
*
* <p>This method should be called at the very end of a guarded thread.
*/
@Internal
......
......@@ -39,13 +39,13 @@ import java.util.Map;
/**
* This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
* the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
* <p>
* Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
*
* <p>Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
* GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
* <p>
* Other than that, it works like a normal {@link CloseableRegistry}.
* <p>
* All methods in this class are thread-safe.
*
* <p>Other than that, it works like a normal {@link CloseableRegistry}.
*
* <p>All methods in this class are thread-safe.
*/
@Internal
public class SafetyNetCloseableRegistry extends
......@@ -54,15 +54,19 @@ public class SafetyNetCloseableRegistry extends
private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
/** Lock for atomic modifications to reaper thread and registry count */
/** Lock for atomic modifications to reaper thread and registry count. */
private static final Object REAPER_THREAD_LOCK = new Object();
/** Singleton reaper thread takes care of all registries in VM */
//CHECKSTYLE.OFF: StaticVariableName
/** Singleton reaper thread takes care of all registries in VM. */
private static CloseableReaperThread REAPER_THREAD = null;
/** Global count of all instances of SafetyNetCloseableRegistry */
/** Global count of all instances of SafetyNetCloseableRegistry. */
private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
//CHECKSTYLE.ON: StaticVariableName
SafetyNetCloseableRegistry() {
super(new IdentityHashMap<>());
......@@ -166,7 +170,7 @@ public class SafetyNetCloseableRegistry extends
}
/**
* Reaper runnable collects and closes leaking resources
* Reaper runnable collects and closes leaking resources.
*/
static final class CloseableReaperThread extends Thread {
......@@ -187,7 +191,7 @@ public class SafetyNetCloseableRegistry extends
try {
while (running) {
final PhantomDelegatingCloseableRef toClose = (PhantomDelegatingCloseableRef) referenceQueue.remove();
if (toClose != null) {
try {
LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
......
......@@ -82,6 +82,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
@SuppressWarnings("deprecation")
public long getDefaultBlockSize() {
return unsafeFileSystem.getDefaultBlockSize();
}
......@@ -107,6 +108,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
@SuppressWarnings("deprecation")
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
throws IOException {
......
......@@ -18,11 +18,11 @@
package org.apache.flink.core.fs.local;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.BlockLocation;
import java.io.IOException;
/**
* Implementation of the {@link BlockLocation} interface for a local file system.
*/
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
......@@ -40,9 +41,9 @@ public class LocalDataInputStream extends FSDataInputStream {
/**
* Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object.
*
*
* @param file The File the data stream is read from
*
*
* @throws IOException Thrown if the data input stream cannot be created.
*/
public LocalDataInputStream(File file) throws IOException {
......@@ -71,18 +72,18 @@ public class LocalDataInputStream extends FSDataInputStream {
public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
return this.fis.read(buffer, offset, length);
}
@Override
public void close() throws IOException {
// Accoring to javadoc, this also closes the channel
this.fis.close();
}
@Override
public int available() throws IOException {
return this.fis.available();
}
@Override
public long skip(final long n) throws IOException {
return this.fis.skip(n);
......
......@@ -18,13 +18,13 @@
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataOutputStream;
/**
* The <code>LocalDataOutputStream</code> class is a wrapper class for a data
* output stream to the local file system.
......@@ -37,7 +37,7 @@ public class LocalDataOutputStream extends FSDataOutputStream {
/**
* Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
*
*
* @param file
* the {@link File} object the data stream is read from
* @throws IOException
......@@ -62,7 +62,6 @@ public class LocalDataOutputStream extends FSDataOutputStream {
fos.close();
}
@Override
public void flush() throws IOException {
fos.flush();
......
......@@ -16,16 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.core.fs.local;
import java.io.File;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import java.io.File;
/**
* The class <code>LocalFileStatus</code> provides an implementation of the {@link FileStatus} interface
* for the local file system.
......@@ -45,7 +44,7 @@ public class LocalFileStatus implements FileStatus {
/**
* Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
*
*
* @param f
* the {@link File} object this <code>LocalFileStatus</code> refers to
* @param fs
......
......@@ -16,10 +16,10 @@
* limitations under the License.
*/
/**
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
/*
* Parts of earlier versions of this file were based on source code from the
* Hadoop Project (http://hadoop.apache.org/), licensed by the Apache Software Foundation (ASF)
* under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
......@@ -65,7 +65,7 @@ public class LocalFileSystem extends FileSystem {
/** The URI representing the local file system. */
private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
/** The shared instance of the local file system */
/** The shared instance of the local file system. */
private static final LocalFileSystem INSTANCE = new LocalFileSystem();
/** Path pointing to the current working directory.
......@@ -73,10 +73,10 @@ public class LocalFileSystem extends FileSystem {
private final String workingDir;
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here */
* Because Paths are not immutable, we cannot cache the proper path here. */
private final String homeDir;
/** The host name of this machine */
/** The host name of this machine. */
private final String hostName;
/**
......@@ -112,7 +112,7 @@ public class LocalFileSystem extends FileSystem {
}
else {
throw new FileNotFoundException("File " + f + " does not exist or the user running "
+ "Flink ('"+System.getProperty("user.name")+"') has insufficient permissions to access it.");
+ "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
}
}
......@@ -149,7 +149,6 @@ public class LocalFileSystem extends FileSystem {
return new File(path.toUri().getPath());
}
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
......@@ -175,7 +174,6 @@ public class LocalFileSystem extends FileSystem {
return results;
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
......@@ -233,7 +231,7 @@ public class LocalFileSystem extends FileSystem {
public boolean mkdirs(final Path f) throws IOException {
final File p2f = pathToFile(f);
if(p2f.isDirectory()) {
if (p2f.isDirectory()) {
return true;
}
......
......@@ -32,6 +32,9 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
/**
* Tests for the {@link AbstractCloseableRegistry}.
*/
public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
protected ProducerThread[] streamOpenThreads;
......@@ -140,8 +143,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
try {
closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
}catch (IOException ignore) {
}
} catch (IOException ignored) {}
blockCloseLatch.trigger();
closer.join();
......@@ -151,7 +153,10 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
}
protected static abstract class ProducerThread<C extends Closeable, T> extends Thread {
/**
* A testing producer.
*/
protected abstract static class ProducerThread<C extends Closeable, T> extends Thread {
protected final AbstractCloseableRegistry<C, T> registry;
protected final AtomicInteger refCount;
......@@ -188,6 +193,9 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
}
}
/**
* Testing stream which adds itself to a reference counter while not closed.
*/
protected static final class TestStream extends FSDataInputStream {
protected AtomicInteger refCount;
......
......@@ -24,6 +24,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests for the {@link CloseableRegistry}.
*/
public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
@Override
......
......@@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.WrappingProxyUtil;
import org.junit.Test;
import java.io.IOException;
......@@ -27,6 +29,9 @@ import java.net.URISyntaxException;
import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link FileSystem} base class.
*/
public class FileSystemTest {
@Test
......
......@@ -28,7 +28,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
......@@ -40,9 +39,12 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.locks.ReentrantLock;
import static org.powermock.api.mockito.PowerMockito.*;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* A test validating that the initialization of local output paths is properly synchronized.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(LocalFileSystem.class)
public class InitOutputPathTest {
......@@ -79,7 +81,7 @@ public class InitOutputPathTest {
@Test
public void testProperSynchronized() throws Exception {
// in the synchronized variant, we cannot use the "await latches" because not
// in the synchronized variant, we cannot use the "await latches" because not
// both threads can make process interleaved (due to the synchronization)
// the test uses sleeps (rather than latches) to produce the same interleaving.
// while that is not guaranteed to produce the pathological interleaving,
......@@ -121,7 +123,7 @@ public class InitOutputPathTest {
});
final LocalFileSystem fs1 = new SyncedFileSystem(
deleteAwaitLatch1, mkdirsAwaitLatch1,
deleteAwaitLatch1, mkdirsAwaitLatch1,
deleteTriggerLatch1, mkdirsTriggerLatch1);
final LocalFileSystem fs2 = new SyncedFileSystem(
......
......@@ -28,7 +28,6 @@ import java.io.IOException;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
......
......@@ -627,7 +627,7 @@ public class LimitedConnectionsFileSystemTest {
}
}
private static abstract class BlockingThread extends CheckedThread {
private abstract static class BlockingThread extends CheckedThread {
private final OneShotLatch waiter = new OneShotLatch();
......
......@@ -15,13 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for the {@link Path} class.
*/
public class PathTest {
@Test
......@@ -70,23 +80,23 @@ public class PathTest {
assertEquals("/C:/my/windows/path", p.toUri().getPath());
try {
new Path((String)null);
new Path((String) null);
fail();
} catch(Exception e) {
} catch (Exception e) {
// exception expected
}
try {
new Path("");
fail();
} catch(Exception e) {
} catch (Exception e) {
// exception expected
}
try {
new Path(" ");
fail();
} catch(Exception e) {
} catch (Exception e) {
// exception expected
}
......
......@@ -32,6 +32,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests for the {@link SafetyNetCloseableRegistry}.
*/
public class SafetyNetCloseableRegistryTest
extends AbstractCloseableRegistryTest<WrappingProxyCloseable<? extends Closeable>,
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
......@@ -44,9 +47,7 @@ public class SafetyNetCloseableRegistryTest
return new WrappingProxyCloseable<Closeable>() {
@Override
public void close() throws IOException {
}
public void close() throws IOException {}
@Override
public Closeable getWrappedDelegate() {
......
......@@ -18,25 +18,13 @@
package org.apache.flink.core.fs.local;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.FileUtils;
import org.junit.Assume;
......@@ -44,6 +32,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* This class tests the functionality of the {@link LocalFileSystem} class in its components. In particular,
* file/directory access, creation, deletion, read, write is tested.
......
......@@ -111,14 +111,6 @@ under the License.
files="(.*)test[/\\](.*)configuration[/\\](.*)"
checks="AvoidStarImport"/>
<suppress
files="(.*)core[/\\]fs[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
<suppress
files="(.*)test[/\\](.*)core[/\\]fs[/\\](.*)"
checks="AvoidStarImport"/>
<suppress
files="(.*)core[/\\]io[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册