提交 893a62f5 编写于 作者: A Aljoscha Krettek

[FLINK-3261] Allow Task to decline checkpoint request if not ready

Before, it could happen that a StreamingTask receives a Checkpoint
Trigger message while internally not being ready. The checkpoint
coordinator would then wait the specified timeout interval before
continuing. Now, tasks can signal that they are not ready and the
checkpoint coordinator will dicard a checkpoint for which is this the
case and trigger new checkpoints if necessary.

The newly triggered checkpoints will also release alignment locks in
streaming tasks that are still waiting for barriers from failed
checkpoints.
上级 1a5ce4da
......@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.slf4j.Logger;
......@@ -504,6 +505,86 @@ public class CheckpointCoordinator {
}
}
/**
* Receives a {@link DeclineCheckpoint} message and returns whether the
* message was associated with a pending checkpoint.
*
* @param message Checkpoint decline from the task manager
*
* @return Flag indicating whether the declined checkpoint was associated
* with a pending checkpoint.
*/
public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received DeclineCheckpoint message for wrong job: {}", message);
return false;
}
final long checkpointId = message.getCheckpointId();
CompletedCheckpoint completed = null;
PendingCheckpoint checkpoint;
// Flag indicating whether the ack message was for a known pending
// checkpoint.
boolean isPendingCheckpoint;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;
LOG.info("Discarding checkpoint " + checkpointId
+ " because of checkpoint decline from task " + message.getTaskExecutionId());
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);
boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
PendingCheckpoint p = entries.next().getValue();
if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
haveMoreRecentPending = true;
break;
}
}
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
triggerQueuedRequests();
}
} else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
} else {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
isPendingCheckpoint = true;
LOG.info("Received another decline checkpoint message for now expired checkpoint attempt " + checkpointId);
} else {
isPendingCheckpoint = false;
}
}
}
return isPendingCheckpoint;
}
/**
* Receives an AcknowledgeCheckpoint message and returns whether the
* message was associated with a pending checkpoint.
......
......@@ -43,8 +43,10 @@ public interface StatefulTask<T extends StateHandle<?>> {
*
* @param checkpointId The ID of the checkpoint, incrementing.
* @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
* {@link org.apache.flink.runtime.jobmanager.JobManager} to tell the checkpoint coordinator
* that a checkpoint request could not be heeded. This can happen if a Task is already in
* RUNNING state but is internally not yet ready to perform checkpoints.
*/
public class DeclineCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
private static final long serialVersionUID = 2094094662279578953L;
/** The timestamp associated with the checkpoint */
private final long timestamp;
public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
super(job, taskExecutionId, checkpointId);
this.timestamp = timestamp;
}
// --------------------------------------------------------------------------------------------
public long getTimestamp() {
return timestamp;
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
else if (o instanceof DeclineCheckpoint) {
DeclineCheckpoint that = (DeclineCheckpoint) o;
return this.timestamp == that.timestamp && super.equals(o);
}
else {
return false;
}
}
@Override
public String toString() {
return String.format("Declined Checkpoint %d@%d for (%s/%s)",
getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
}
}
......@@ -53,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.util.SerializedValue;
......@@ -874,7 +875,11 @@ public class Task implements Runnable {
@Override
public void run() {
try {
statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
jobManager.tell(decline);
}
}
catch (Throwable t) {
failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
......
......@@ -51,7 +51,7 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
......@@ -1153,12 +1153,50 @@ class JobManager(
}
else {
log.error(
s"Received ConfirmCheckpoint message for job $jid with no CheckpointCoordinator")
s"Received AcknowledgeCheckpoint message for job $jid with no " +
s"CheckpointCoordinator")
}
case None => log.error(s"Received ConfirmCheckpoint for unavailable job $jid")
case None => log.error(s"Received AcknowledgeCheckpoint for unavailable job $jid")
}
case declineMessage: DeclineCheckpoint =>
val jid = declineMessage.getJob()
currentJobs.get(jid) match {
case Some((graph, _)) =>
val checkpointCoordinator = graph.getCheckpointCoordinator()
val savepointCoordinator = graph.getSavepointCoordinator()
if (checkpointCoordinator != null && savepointCoordinator != null) {
future {
try {
if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
// OK, this is the common case
}
else {
// Try the savepoint coordinator if the message was not addressed
// to the periodic checkpoint coordinator.
if (!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
log.info("Received message for non-existing checkpoint " +
declineMessage.getCheckpointId)
}
}
}
catch {
case t: Throwable =>
log.error(s"Error in CheckpointCoordinator while processing $declineMessage", t)
}
}(context.dispatcher)
}
else {
log.error(
s"Received DeclineCheckpoint message for job $jid with no CheckpointCoordinator")
}
case None => log.error(s"Received DeclineCheckpoint for unavailable job $jid")
}
// unknown checkpoint message
case _ => unhandled(actorMessage)
}
......
......@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
......@@ -38,14 +39,11 @@ import org.mockito.stubbing.Answer;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
......@@ -194,6 +192,245 @@ public class CheckpointCoordinatorTest {
}
}
/**
* This test triggers a checkpoint and then sends a decline checkpoint message from
* one of the tasks. The expected behaviour is that said checkpoint is discarded and a new
* checkpoint is triggered.
*/
@Test
public void testTriggerAndDeclineCheckpointSimple() {
try {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
jid, 600000, 600000,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp));
// validate that we have a pending checkpoint
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
assertNotNull(checkpoint);
assertEquals(checkpointId, checkpoint.getCheckpointId());
assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
assertEquals(jid, checkpoint.getJobId());
assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint.getCollectedStates().size());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// check that the vertices received the trigger checkpoint message
{
TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
// acknowledge from one of the tasks
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the same task again (should not matter)
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// decline checkpoint from the other task, this should cancel the checkpoint
// and trigger a new one
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
assertTrue(checkpoint.isDiscarded());
// validate that we have a new pending checkpoint
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
assertNotNull(checkpointNew);
assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
assertEquals(jid, checkpointNew.getJobId());
assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpointNew.getCollectedStates().size());
assertFalse(checkpointNew.isDiscarded());
assertFalse(checkpointNew.isFullyAcknowledged());
assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
// check that the vertices received the new trigger checkpoint message
{
TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
// decline again, nothing should happen
// decline from the other task, nothing should happen
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
assertTrue(checkpoint.isDiscarded());
// should still have the same second checkpoint pending
long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
assertEquals(checkpointIdNew2, checkpointIdNew);
coord.shutdown();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* This test triggers two checkpoints and then sends a decline message from one of the tasks
* for the first checkpoint. This should discard the first checkpoint while not triggering
* a new checkpoint because a later checkpoint is already in progress.
*/
@Test
public void testTriggerAndDeclineCheckpointComplex() {
try {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
jid, 600000, 600000,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp));
// trigger second checkpoint, should also succeed
assertTrue(coord.triggerCheckpoint(timestamp + 2));
// validate that we have a pending checkpoint
assertEquals(2, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
long checkpoint1Id = it.next().getKey();
long checkpoint2Id = it.next().getKey();
PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
assertNotNull(checkpoint1);
assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
assertEquals(timestamp, checkpoint1.getCheckpointTimestamp());
assertEquals(jid, checkpoint1.getJobId());
assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint1.getCollectedStates().size());
assertFalse(checkpoint1.isDiscarded());
assertFalse(checkpoint1.isFullyAcknowledged());
assertNotNull(checkpoint2);
assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
assertEquals(timestamp + 2, checkpoint2.getCheckpointTimestamp());
assertEquals(jid, checkpoint2.getJobId());
assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpoint2.getCollectedStates().size());
assertFalse(checkpoint2.isDiscarded());
assertFalse(checkpoint2.isFullyAcknowledged());
// check that the vertices received the trigger checkpoint message
{
TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpoint1Id, timestamp);
TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpoint1Id, timestamp);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
// check that the vertices received the trigger checkpoint message for the second checkpoint
{
TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpoint2Id, timestamp + 2);
TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpoint2Id, timestamp + 2);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
// and trigger a new one
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
// validate that it is the same second checkpoint from earlier
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
assertEquals(checkpoint2Id, checkpointIdNew);
assertNotNull(checkpointNew);
assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
assertEquals(jid, checkpointNew.getJobId());
assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
assertEquals(0, checkpointNew.getCollectedStates().size());
assertFalse(checkpointNew.isDiscarded());
assertFalse(checkpointNew.isFullyAcknowledged());
assertNotEquals(checkpoint1.getCheckpointId(), checkpointNew.getCheckpointId());
// decline again, nothing should happen
// decline from the other task, nothing should happen
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
assertTrue(checkpoint1.isDiscarded());
coord.shutdown();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTriggerAndConfirmSimpleCheckpoint() {
try {
......
......@@ -204,7 +204,7 @@ public class TaskAsyncCallTest {
}
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) {
public boolean triggerCheckpoint(long checkpointId, long timestamp) {
lastCheckpointId++;
if (checkpointId == lastCheckpointId) {
if (lastCheckpointId == NUM_CALLS) {
......@@ -217,6 +217,7 @@ public class TaskAsyncCallTest {
notifyAll();
}
}
return true;
}
@Override
......
......@@ -1137,7 +1137,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final int sourceParallelism,
final String topicName,
final int valuesCount, final int startFrom) throws Exception {
env.getCheckpointConfig().setCheckpointTimeout(5000); // set timeout for checkpoints to 5 seconds
final int finalCount = valuesCount * sourceParallelism;
......
......@@ -444,7 +444,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
@Override
@SuppressWarnings("unchecked,rawtypes")
public void triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception {
public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception {
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
synchronized (lock) {
......@@ -526,6 +526,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
throw e;
}
}
return true;
} else {
return false;
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册