From 80d1c06dc04937a47e0853309e0add63903023b8 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 17 Mar 2017 15:32:23 +0100 Subject: [PATCH] [FLINK-6107] Custom checkstyle for flink-streaming-java This is based on the Apache Beam checkstyle with some custom modifications for making it fit the exisiting Flink code base. The most notable change is that Flink uses Tabs for indentation while Beam uses spaces. This adds a lot of checks that are commented out because they require non-trivial changes to the code base. There are some trivial code changes for good practices checkstyle rules that where only broken in very few places. --- flink-streaming-java/pom.xml | 35 ++ .../collector/selector/OutputSelector.java | 2 +- .../streaming/api/datastream/DataStream.java | 2 +- .../source/ContinuousFileReaderOperator.java | 2 +- .../MessageAcknowledgingSourceBase.java | 4 +- .../windowing/delta/DeltaFunction.java | 2 +- .../windowing/delta/extractor/Extractor.java | 2 +- .../flink/streaming/api/graph/StreamEdge.java | 8 +- .../api/graph/StreamGraphGenerator.java | 2 +- .../flink/streaming/api/graph/StreamNode.java | 2 +- .../api/operators/StreamSourceContexts.java | 2 +- .../streaming/api/operators/Triggerable.java | 2 +- .../runtime/operators/windowing/KeyMap.java | 4 +- .../operators/windowing/WindowOperator.java | 2 +- .../runtime/tasks/ProcessingTimeCallback.java | 2 +- .../util/typeutils/FieldAccessor.java | 4 +- .../util/typeutils/FieldAccessorFactory.java | 6 +- tools/maven/strict-checkstyle.xml | 550 ++++++++++++++++++ 18 files changed, 609 insertions(+), 24 deletions(-) create mode 100644 tools/maven/strict-checkstyle.xml diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 950c923fc4d..72c2133722a 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -123,6 +123,41 @@ under the License. + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + 6.19 + + + + ../tools/maven/strict-checkstyle.xml + ../tools/maven/suppressions.xml + true + true + + + + + test-compile + + check + + + + diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java index 6632da48a12..126effef275 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java @@ -42,5 +42,5 @@ public interface OutputSelector extends Serializable { * @param value * Output object for which the output selection should be made. */ - public Iterable select(OUT value); + Iterable select(OUT value); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 71ef048d6ef..fc70416e7d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1147,7 +1147,7 @@ public class DataStream { // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() ); + ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index b86d97c476a..6e8d01d3212 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -367,7 +367,7 @@ public class ContinuousFileReaderOperator extends AbstractStreamOperator getReaderState() throws IOException { List snapshot = new ArrayList<>(this.pendingSplits.size()); - if (currentSplit != null ) { + if (currentSplit != null) { if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { Serializable formatState = ((CheckpointableInputFormat) this.format).getCurrentState(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 035b7ddd1c9..e835070d0fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -178,9 +178,9 @@ public abstract class MessageAcknowledgingSourceBase /** * This method must be implemented to acknowledge the given set of IDs back to the message queue. - * @param UIds The list od IDs to acknowledge. + * @param uIds The list od IDs to acknowledge. */ - protected abstract void acknowledgeIDs(long checkpointId, List UIds); + protected abstract void acknowledgeIDs(long checkpointId, List uIds); /** * Adds an ID to be stored with the current checkpoint. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java index b94e453c07c..d9295c42a72 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java @@ -42,6 +42,6 @@ public interface DeltaFunction extends Serializable { * the new data point. * @return the delta between the two given points. */ - public double getDelta(DATA oldDataPoint, DATA newDataPoint); + double getDelta(DATA oldDataPoint, DATA newDataPoint); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java index a94f09a2fc8..0b7e4da0005 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java @@ -41,6 +41,6 @@ public interface Extractor extends Serializable { * the input data * @return the extracted/converted data */ - public TO extract(FROM in); + TO extract(FROM in); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 8e1c36165ca..94e624ff4e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -34,15 +34,15 @@ public class StreamEdge implements Serializable { private static final long serialVersionUID = 1L; - final private String edgeId; + private final String edgeId; - final private StreamNode sourceVertex; - final private StreamNode targetVertex; + private final StreamNode sourceVertex; + private final StreamNode targetVertex; /** * The type number of the input for co-tasks. */ - final private int typeNumber; + private final int typeNumber; /** * A list of output names that the target vertex listens to (if there is diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index f9eec4f10ca..3b9a506cfb4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -369,7 +369,7 @@ public class StreamGraphGenerator { iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), - iterate.getPreferredResources() ); + iterate.getPreferredResources()); StreamNode itSource = itSourceAndSink.f0; StreamNode itSink = itSourceAndSink.f1; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index e4164bad65e..71f2a4a4e1b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -40,7 +40,7 @@ public class StreamNode implements Serializable { private static final long serialVersionUID = 1L; - transient private StreamExecutionEnvironment env; + private transient StreamExecutionEnvironment env; private final int id; private Integer parallelism = null; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index e4d051c01f0..36d10d3c980 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -336,7 +336,7 @@ public class StreamSourceContexts { * toggles the status. ACTIVE status resumes as soon as some record or watermark is collected * again. */ - private static abstract class WatermarkContext implements SourceFunction.SourceContext { + private abstract static class WatermarkContext implements SourceFunction.SourceContext { protected final ProcessingTimeService timeService; protected final Object checkpointLock; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java index 36e9ad19524..48b18b33057 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java @@ -36,5 +36,5 @@ public interface Triggerable { /** * Invoked when a processing-time timer fires. */ - void onProcessingTime(InternalTimer timer) throws Exception ; + void onProcessingTime(InternalTimer timer) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java index 52c321c731a..b0f4f46949b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java @@ -606,7 +606,7 @@ public class KeyMap implements Iterable> { * * @param The type created by the factory. */ - public static interface LazyFactory { + public interface LazyFactory { /** * The factory method; creates the value. @@ -625,7 +625,7 @@ public class KeyMap implements Iterable> { * @param The type of the key. * @param The type of the value. */ - public static interface TraversalEvaluator { + public interface TraversalEvaluator { /** * Called whenever the traversal starts with a new key. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 3d407167b5c..70692517960 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -310,7 +310,7 @@ public class WindowOperator final TupleSerializer> tupleSerializer = new TupleSerializer<>( typedTuple, - new TypeSerializer[] {windowSerializer, windowSerializer} ); + new TypeSerializer[] {windowSerializer, windowSerializer}); final ListStateDescriptor> mergingSetsStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java index 035939f7b3b..7588c3d54c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java @@ -36,5 +36,5 @@ public interface ProcessingTimeCallback { * * @param timestamp The timestamp for which the trigger event was scheduled. */ - void onProcessingTime(long timestamp) throws Exception ; + void onProcessingTime(long timestamp) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java index 2828308589c..21f477ae2ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java @@ -92,7 +92,7 @@ public abstract class FieldAccessor implements Serializable { * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a * field of a POJO that is itself some composite type but is not further decomposed) */ - final static class SimpleFieldAccessor extends FieldAccessor { + static final class SimpleFieldAccessor extends FieldAccessor { private static final long serialVersionUID = 1L; @@ -113,7 +113,7 @@ public abstract class FieldAccessor implements Serializable { } } - final static class ArrayFieldAccessor extends FieldAccessor { + static final class ArrayFieldAccessor extends FieldAccessor { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java index 6dbeedd6ab4..1bfce5ba546 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java @@ -198,9 +198,9 @@ public class FieldAccessorFactory implements Serializable { // -------------------------------------------------------------------------------------------------- - private final static String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // This can start with a digit (because of Tuples) - private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; - private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + private static final String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // This can start with a digit (because of Tuples) + private static final String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private static final String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA; diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml new file mode 100644 index 00000000000..ba2f4be6efe --- /dev/null +++ b/tools/maven/strict-checkstyle.xmlitLab