From 9365441396efdcf852e9076bdb6ca0fcc841434c Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 14 Jan 2016 22:08:41 +0100 Subject: [PATCH] [hotfix] Minor cleanup of warnings, comments, and code style in the Java API Utils --- .../java/org/apache/flink/api/java/Utils.java | 68 +++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index cb109064c76..038b58cff64 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -19,12 +19,14 @@ package org.apache.flink.api.java; import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; import java.io.IOException; import java.lang.reflect.Field; @@ -32,7 +34,6 @@ import java.lang.reflect.Modifier; import java.util.List; import java.util.Random; -import org.apache.flink.configuration.Configuration; import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis; /** @@ -63,19 +64,28 @@ public final class Utils { * * @param typeInfo {@link CompositeType} */ - public static void getContainedGenericTypes(CompositeType typeInfo, List> target) { - for(int i = 0; i < typeInfo.getArity(); i++) { + public static void getContainedGenericTypes(CompositeType typeInfo, List> target) { + for (int i = 0; i < typeInfo.getArity(); i++) { TypeInformation type = typeInfo.getTypeAt(i); - if(type instanceof CompositeType) { - getContainedGenericTypes((CompositeType) type, target); - } else if(type instanceof GenericTypeInfo) { - if(!target.contains(type)) { + if (type instanceof CompositeType) { + getContainedGenericTypes((CompositeType) type, target); + } else if (type instanceof GenericTypeInfo) { + if (!target.contains(type)) { target.add((GenericTypeInfo) type); } } } } + // -------------------------------------------------------------------------------------------- + + /** + * Utility sink function that counts elements and writes the count into an accumulator, + * from which it can be retrieved by the client. This sink is used by the + * {@link DataSet#count()} function. + * + * @param Type of elements to count. + */ @SkipCodeAnalysis public static class CountHelper extends RichOutputFormat { @@ -90,24 +100,29 @@ public final class Utils { } @Override - public void configure(Configuration parameters) { - } + public void configure(Configuration parameters) {} @Override - public void open(int taskNumber, int numTasks) throws IOException { - } + public void open(int taskNumber, int numTasks) {} @Override - public void writeRecord(T record) throws IOException { + public void writeRecord(T record) { counter++; } @Override - public void close() throws IOException { + public void close() { getRuntimeContext().getLongCounter(id).add(counter); } } + /** + * Utility sink function that collects elements into an accumulator, + * from which it they can be retrieved by the client. This sink is used by the + * {@link DataSet#collect()} function. + * + * @param Type of elements to count. + */ @SkipCodeAnalysis public static class CollectHelper extends RichOutputFormat { @@ -124,11 +139,10 @@ public final class Utils { } @Override - public void configure(Configuration parameters) { - } + public void configure(Configuration parameters) {} @Override - public void open(int taskNumber, int numTasks) throws IOException { + public void open(int taskNumber, int numTasks) { this.accumulator = new SerializedListAccumulator<>(); } @@ -138,13 +152,12 @@ public final class Utils { } @Override - public void close() throws IOException { + public void close() { // Important: should only be added in close method to minimize traffic of accumulators getRuntimeContext().addAccumulator(id, accumulator); } } - // -------------------------------------------------------------------------------------------- /** @@ -157,16 +170,16 @@ public final class Utils { private static String getSerializerTree(TypeInformation ti, int indent) { String ret = ""; - if(ti instanceof CompositeType) { + if (ti instanceof CompositeType) { ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName()+"\n"; CompositeType cti = (CompositeType) ti; String[] fieldNames = cti.getFieldNames(); - for(int i = 0; i < cti.getArity(); i++) { - TypeInformation fieldType = cti.getTypeAt(i); + for (int i = 0; i < cti.getArity(); i++) { + TypeInformation fieldType = cti.getTypeAt(i); ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i]+":"+getSerializerTree(fieldType, indent); } } else { - if(ti instanceof GenericTypeInfo) { + if (ti instanceof GenericTypeInfo) { ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n"; ret += getGenericTypeTree(ti.getTypeClass(), indent + 4); } else { @@ -176,14 +189,15 @@ public final class Utils { return ret; } - private static String getGenericTypeTree(Class type, int indent) { + private static String getGenericTypeTree(Class type, int indent) { String ret = ""; - for(Field field : type.getDeclaredFields()) { - if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { + for (Field field : type.getDeclaredFields()) { + if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { continue; } - ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + (field.getType().isEnum() ? " (is enum)" : "") + "\n"; - if(!field.getType().isPrimitive()) { + ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + + (field.getType().isEnum() ? " (is enum)" : "") + "\n"; + if (!field.getType().isPrimitive()) { ret += getGenericTypeTree(field.getType(), indent + 4); } } -- GitLab