From 7e5a97062c8d5d322be81a9d8af35f942a929f57 Mon Sep 17 00:00:00 2001 From: Johannes Date: Tue, 5 May 2015 17:32:41 +0200 Subject: [PATCH] [FLINK-1974] Fix getNetRuntime() of JobExecutionResult and add documentation - Fix JobInfo to report milliseconds - Added documentation to indicate that the return type is in milliseconds - Added an getNetRuntime method which accepts a desired time unit for easy conversion This closes #652 --- .../org/apache/flink/client/CliFrontend.java | 2 +- .../flink/api/common/JobExecutionResult.java | 16 ++++++++++++++-- .../client/SerializedJobExecutionResult.java | 14 +++++++++++++- .../flink/runtime/jobmanager/JobInfo.scala | 2 +- .../client/SerializedJobExecutionResultTest.java | 4 ++++ .../java/record/io/jdbc/example/JDBCExample.java | 2 +- .../test/recordJobs/wordcount/WordCount.java | 11 ++++++----- .../test/recovery/SimpleRecoveryITCase.java | 2 +- .../runtime/NetworkStackThroughputITCase.java | 3 ++- 9 files changed, 43 insertions(+), 13 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index a13b322d137..6ca8c4d08da 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -609,7 +609,7 @@ public class CliFrontend { } if (execResult instanceof JobExecutionResult) { JobExecutionResult result = (JobExecutionResult) execResult; - System.out.println("Job Runtime: " + result.getNetRuntime()); + System.out.println("Job Runtime: " + result.getNetRuntime() + " ms"); Map accumulatorsResult = result.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { System.out.println("Accumulator Results: "); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java index 62848f75b95..bf06c75fb3f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * The result of a job execution. Gives access to the execution time of the job, @@ -34,7 +35,7 @@ public class JobExecutionResult extends JobSubmissionResult { * Creates a new JobExecutionResult. * * @param jobID The job's ID. - * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) + * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds * @param accumulators A map of all accumulators produced by the job. */ public JobExecutionResult(JobID jobID, long netRuntime, Map accumulators) { @@ -47,12 +48,23 @@ public class JobExecutionResult extends JobSubmissionResult { * Gets the net execution time of the job, i.e., the execution time in the parallel system, * without the pre-flight steps like the optimizer. * - * @return The net execution time. + * @return The net execution time in milliseconds. */ public long getNetRuntime() { return this.netRuntime; } + /** + * Gets the net execution time of the job, i.e., the execution time in the parallel system, + * without the pre-flight steps like the optimizer in a desired time unit. + * + * @param desiredUnit the unit of the NetRuntime + * @return The net execution time in the desired unit. + */ + public long getNetRuntime(TimeUnit desiredUnit) { + return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS); + } + /** * Gets the accumulator with the given name. Returns {@code null}, if no accumulator with * that name was produced. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java index e0b1ad149d4..029bc3876f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * A variant of the {@link org.apache.flink.api.common.JobExecutionResult} that holds @@ -45,7 +46,7 @@ public class SerializedJobExecutionResult implements java.io.Serializable { * Creates a new SerializedJobExecutionResult. * * @param jobID The job's ID. - * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) + * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds * @param accumulators A map of all accumulator results produced by the job, in serialized form */ public SerializedJobExecutionResult(JobID jobID, long netRuntime, @@ -63,6 +64,17 @@ public class SerializedJobExecutionResult implements java.io.Serializable { return netRuntime; } + /** + * Gets the net execution time of the job, i.e., the execution time in the parallel system, + * without the pre-flight steps like the optimizer in a desired time unit. + * + * @param desiredUnit the unit of the NetRuntime + * @return The net execution time in the desired unit. + */ + public long getNetRuntime(TimeUnit desiredUnit) { + return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS); + } + public Map> getSerializedAccumulatorResults() { return this.accumulatorResults; } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 4b7446c16d0..26d7272ed65 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -34,7 +34,7 @@ class JobInfo(val client: ActorRef, val start: Long){ def duration: Long = { if(end != -1){ - (end - start)/1000 + end - start }else{ -1 } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java index f58bbe18560..5c9ffa7c674 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -53,6 +54,7 @@ public class SerializedJobExecutionResultTest { assertEquals(origJobId, cloned.getJobId()); assertEquals(origTime, cloned.getNetRuntime()); + assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS)); assertEquals(origMap, cloned.getSerializedAccumulatorResults()); // convert to deserialized result @@ -62,7 +64,9 @@ public class SerializedJobExecutionResultTest { assertEquals(origJobId, jResult.getJobID()); assertEquals(origJobId, jResultCopied.getJobID()); assertEquals(origTime, jResult.getNetRuntime()); + assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS)); assertEquals(origTime, jResultCopied.getNetRuntime()); + assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS)); for (Map.Entry> entry : origMap.entrySet()) { String name = entry.getKey(); diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java index 58aaf9e67b8..213fd6a7147 100644 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java +++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java @@ -86,7 +86,7 @@ public class JDBCExample implements Program, ProgramDescription { prepareTestDb(); JDBCExample tut = new JDBCExample(); JobExecutionResult res = LocalExecutor.execute(tut, args); - System.out.println("runtime: " + res.getNetRuntime()); + System.out.println("runtime: " + res.getNetRuntime() + " ms"); System.exit(0); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java index 53b26637d20..96eb1fc539a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java @@ -18,16 +18,13 @@ package org.apache.flink.test.recordJobs.wordcount; -import java.util.Iterator; -import java.util.StringTokenizer; - import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.record.functions.MapFunction; import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.record.io.CsvOutputFormat; import org.apache.flink.api.java.record.io.TextInputFormat; import org.apache.flink.api.java.record.operators.FileDataSink; @@ -41,6 +38,10 @@ import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; import org.apache.flink.util.Collector; +import java.util.Iterator; +import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; + /** * Implements a word count which takes the input file and counts the number of * the occurrences of each word in the file. @@ -154,6 +155,6 @@ public class WordCount implements Program, ProgramDescription { // This will execute the word-count embedded in a local context. replace this line by the commented // succeeding line to send the job to a local installation or to a cluster for execution JobExecutionResult result = LocalExecutor.execute(plan); - System.err.println("Total runtime: " + result.getNetRuntime()); + System.err.println("Total runtime: " + result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms"); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index e61e5514fb6..e2f5a71591a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -96,7 +96,7 @@ public class SimpleRecoveryITCase { try { JobExecutionResult res = env.execute(); - String msg = res == null ? "null result" : "result in " + res.getNetRuntime(); + String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms"; fail("The program should have failed, but returned " + msg); } catch (ProgramInvocationException e) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 36c7cbad50f..7b43266374f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.TimeUnit; @Ignore public class NetworkStackThroughputITCase { @@ -143,7 +144,7 @@ public class NetworkStackThroughputITCase { int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1); long dataVolumeMbit = dataVolumeGb * 8192; - long runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000; + long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS); int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs); -- GitLab