diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 9b0e01947c399c009a0bfa255bc845cd6180958b..3e2e35988633641daac645b0529171be8f36bc77 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -40,7 +40,8 @@ public class AccumulatorHelper { if (ownAccumulator == null) { // Take over counter from chained task target.put(otherEntry.getKey(), otherEntry.getValue()); - } else { + } + else { // Both should have the same type AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(), ownAccumulator.getClass(), otherEntry.getValue().getClass()); @@ -122,12 +123,13 @@ public class AccumulatorHelper { return builder.toString(); } - public static void resetAndClearAccumulators( - Map> accumulators) { - for (Map.Entry> entry : accumulators.entrySet()) { - entry.getValue().resetLocal(); + public static void resetAndClearAccumulators(Map> accumulators) { + if (accumulators != null) { + for (Map.Entry> entry : accumulators.entrySet()) { + entry.getValue().resetLocal(); + } + accumulators.clear(); } - accumulators.clear(); } public static Map> copy(final Map extends AbstractInvokable i // JobManager. close() has been called earlier for all involved UDFs // (using this.stub.close() and closeChainedTasks()), so UDFs can no longer // modify accumulators; - if (this.stub != null) { - // collect the counters from the stub - if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) { - Map> accumulators = - FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); - RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); - } - } + + // collect the counters from the udf in the core driver + Map> accumulators = + FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); + + // collect accumulators from chained tasks and report them + reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause @@ -572,16 +573,25 @@ public class RegularPactTask extends AbstractInvokable i // We can merge here the accumulators from the stub and the chained // tasks. Type conflicts can occur here if counters with same name but // different type were used. - for (ChainedDriver chainedTask : chainedTasks) { - if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { - Map> chainedAccumulators = - FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators(); - AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + + if (!chainedTasks.isEmpty()) { + if (accumulators == null) { + accumulators = new HashMap>(); + } + + for (ChainedDriver chainedTask : chainedTasks) { + RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); + if (rc != null) { + Map> chainedAccumulators = rc.getAllAccumulators(); + if (chainedAccumulators != null) { + AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + } + } } } // Don't report if the UDF didn't collect any accumulators - if (accumulators.size() == 0) { + if (accumulators == null || accumulators.size() == 0) { return; } @@ -592,9 +602,11 @@ public class RegularPactTask extends AbstractInvokable i // (e.g. in iterations) and we don't want to count twice. This may not be // done before sending AccumulatorHelper.resetAndClearAccumulators(accumulators); + for (ChainedDriver chainedTask : chainedTasks) { - if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { - AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators()); + RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); + if (rc != null) { + AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators()); } } } @@ -1140,7 +1152,7 @@ public class RegularPactTask extends AbstractInvokable i } catch (InterruptedException iex) { throw new RuntimeException("Interrupted while waiting for input " + index + " to become available."); } catch (IOException ioex) { - throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + "."); + throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + "."); } } }