提交 fdebcb83 编写于 作者: M Maximilian Michels

[FLINK-2387] add streaming test case for live accumulators

This closes #926.
上级 dba2946f
......@@ -36,7 +36,7 @@ import java.util.Collection;
* <p>Upon construction, this source function serializes the elements using Flink's type information.
* That way, any object transport using Java serialization will not be affected by the serializability
* if the elements.</p>
* of the elements.</p>
* @param <T> The type of elements returned by this function.
......@@ -24,6 +24,7 @@ import akka.actor.Status;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
......@@ -52,6 +53,8 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
......@@ -60,7 +63,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
......@@ -75,6 +77,17 @@ import static org.junit.Assert.*;
* Tests the availability of accumulator results during runtime. The test case tests a user-defined
* accumulator and Flink's internal accumulators for two consecutive tasks.
* CHAINED[Source -> Map] -> Sink
* Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
* the task to the task manager which notifies the job manager and sends the current accumulators.
* The task blocks until the test has been notified about the current accumulator values.
* A barrier between the operators ensures that that pipelining is disabled for the streaming test.
* The batch job reads the records one at a time. The streaming code buffers the records beforehand;
* that's why exact guarantees about the number of records read are very hard to make. Thus, why we
* check for an upper bound of the elements read.
public class AccumulatorLiveITCase {
......@@ -83,15 +96,17 @@ public class AccumulatorLiveITCase {
private static ActorSystem system;
private static ActorGateway jobManagerGateway;
private static ActorRef taskManager;
private static JobID jobID;
private static JobGraph jobGraph;
// name of user accumulator
private static String NAME = "test";
private static String ACCUMULATOR_NAME = "test";
// number of heartbeat intervals to check
private static final int NUM_ITERATIONS = 5;
private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS);
private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
......@@ -113,30 +128,61 @@ public class AccumulatorLiveITCase {
for (int i=0; i < NUM_ITERATIONS; i++) {
inputData.add(i, String.valueOf(i+1));
NotifyingMapper.finished = false;
public void after() throws Exception {
public void testProgram() throws Exception {
new JavaTestKit(system) {{
public void testBatch() throws Exception {
/** The program **/
ExecutionEnvironment env = new PlanExtractor();
ExecutionEnvironment env = new BatchPlanExtractor();
DataSet<String> input = env.fromCollection(inputData);
.flatMap(new WaitingUDF())
.output(new WaitingOutputFormat());
.flatMap(new NotifyingMapper())
.output(new NotifyingOutputFormat());
// Extract job graph and set job id for the task to notify of accumulator changes.
JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan);
jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
jobID = jobGraph.getJobID();
public void testStreaming() throws Exception {
StreamExecutionEnvironment env = new StreamJobExtractor();
DataStream<String> input = env.fromCollection(inputData);
.flatMap(new NotifyingMapper())
.write(new NotifyingOutputFormat(), 1000).disableChaining();
jobGraph = ((StreamJobExtractor) env).graph;
jobID = jobGraph.getJobID();
private static void verifyResults() {
new JavaTestKit(system) {{
ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
// register for accumulator changes
......@@ -149,12 +195,12 @@ public class AccumulatorLiveITCase {
expectMsgClass(TIMEOUT, Status.Success.class);
ExecutionAttemptID mapperTaskID = null;
TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = msg.flinkAccumulators();
Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
ExecutionAttemptID mapperTaskID = null;
// find out the first task's execution attempt id
for (Map.Entry<ExecutionAttemptID, ?> entry : flinkAccumulators.entrySet()) {
if (entry.getValue() != null) {
......@@ -163,8 +209,19 @@ public class AccumulatorLiveITCase {
ExecutionAttemptID sinkTaskID = null;
// find the second's task id
for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
if (key != mapperTaskID) {
sinkTaskID = key;
/* Check for accumulator values */
if(checkUserAccumulators(0, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
if(checkUserAccumulators(0, userAccumulators) &&
checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
LOG.info("Passed initial check for map task.");
} else {
fail("Wrong accumulator results when map task begins execution.");
......@@ -172,7 +229,6 @@ public class AccumulatorLiveITCase {
int expectedAccVal = 0;
ExecutionAttemptID sinkTaskID = null;
/* for mapper task */
for (int i = 1; i <= NUM_ITERATIONS; i++) {
......@@ -186,8 +242,16 @@ public class AccumulatorLiveITCase {
LOG.info("{}", flinkAccumulators);
LOG.info("{}", userAccumulators);
if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
LOG.info("Passed round " + i);
if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
LOG.info("Passed round #" + i);
} else if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
// we determined the wrong task id and need to switch the two here
ExecutionAttemptID temp = mapperTaskID;
mapperTaskID = sinkTaskID;
sinkTaskID = temp;
LOG.info("Passed round #" + i);
} else {
fail("Failed in round #" + i);
......@@ -197,15 +261,8 @@ public class AccumulatorLiveITCase {
flinkAccumulators = msg.flinkAccumulators();
userAccumulators = msg.userAccumulators();
// find the second's task id
for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
if (key != mapperTaskID) {
sinkTaskID = key;
if(checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
if(checkUserAccumulators(expectedAccVal, userAccumulators) &&
checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
LOG.info("Passed initial check for sink task.");
} else {
fail("Wrong accumulator results when sink task begins execution.");
......@@ -223,8 +280,9 @@ public class AccumulatorLiveITCase {
LOG.info("{}", flinkAccumulators);
LOG.info("{}", userAccumulators);
if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, i, 0, i*4, 0, flinkAccumulators)) {
LOG.info("Passed round " + i);
if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) {
LOG.info("Passed round #" + i);
} else {
fail("Failed in round #" + i);
......@@ -235,9 +293,10 @@ public class AccumulatorLiveITCase {
private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
LOG.info("checking user accumulators");
return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
private static boolean checkFlinkAccumulators(ExecutionAttemptID taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, int expectedBytesOut,
......@@ -253,12 +312,12 @@ public class AccumulatorLiveITCase {
* The following two cases are for the DataSource and Map task
if(((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsOut) {
if(((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsOut) {
return false;
if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesOut) {
if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesOut) {
return false;
......@@ -266,12 +325,12 @@ public class AccumulatorLiveITCase {
* The following two cases are for the DataSink task
if (((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsIn) {
if (((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsIn) {
return false;
if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesIn) {
if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesIn) {
return false;
......@@ -284,15 +343,17 @@ public class AccumulatorLiveITCase {
* UDF that waits for at least the heartbeat interval's duration.
* UDF that notifies when it changes the accumulator values
private static class WaitingUDF extends RichFlatMapFunction<String, Integer> {
private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
private IntCounter counter = new IntCounter();
private static boolean finished = false;
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(NAME, counter);
getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
......@@ -305,9 +366,16 @@ public class AccumulatorLiveITCase {
public void close() throws Exception {
finished = true;
private static class WaitingOutputFormat implements OutputFormat<Integer> {
* Outputs format which notifies of accumulator changes and waits for the previous mapper.
private static class NotifyingOutputFormat implements OutputFormat<Integer> {
public void configure(Configuration parameters) {
......@@ -315,6 +383,11 @@ public class AccumulatorLiveITCase {
public void open(int taskNumber, int numTasks) throws IOException {
while (!NotifyingMapper.finished) {
try {
} catch (InterruptedException e) {}
......@@ -334,7 +407,7 @@ public class AccumulatorLiveITCase {
public static void notifyTaskManagerOfAccumulatorUpdate() {
new JavaTestKit(system) {{
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Timeout timeout = new Timeout(TIMEOUT);
Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
try {
Await.result(ask, timeout.duration());
......@@ -354,7 +427,7 @@ public class AccumulatorLiveITCase {
return jgg.compileJobGraph(op);
private static class PlanExtractor extends LocalEnvironment {
private static class BatchPlanExtractor extends LocalEnvironment {
private Plan plan = null;
......@@ -363,6 +436,22 @@ public class AccumulatorLiveITCase {
plan = createProgramPlan();
return new JobExecutionResult(new JobID(), -1, null);
private static class StreamJobExtractor extends StreamExecutionEnvironment {
private JobGraph graph = null;
public JobExecutionResult execute() throws Exception {
return execute("default");
public JobExecutionResult execute(String jobName) throws Exception {
graph = this.streamGraph.getJobGraph();
return new JobExecutionResult(new JobID(), -1, null);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册