未验证 提交 ca326529 编写于 作者: J Jihoon Son 提交者: GitHub

Fix potential deadlock in batch ingestion (#10736)

* Fix potential deadlock in batch ingestion

* fix checkstyle and comment

* this is better
上级 3984457e
......@@ -398,10 +398,13 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
// The given intervals are first converted to align with segment granularity. This is because,
// when an overwriting task finds a version for a given input row, it expects the interval
// associated to each version to be equal or larger than the time bucket where the input row falls in.
// See ParallelIndexSupervisorTask.findVersion().
final Set<Interval> uniqueIntervals = new HashSet<>();
for (Interval interval : JodaUtils.condenseIntervals(intervals)) {
final Granularity segmentGranularity = getSegmentGranularity();
final Granularity segmentGranularity = getSegmentGranularity();
for (Interval interval : intervals) {
if (segmentGranularity == null) {
uniqueIntervals.add(interval);
} else {
......@@ -409,7 +412,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
}
for (Interval interval : uniqueIntervals) {
// Condense intervals to avoid creating too many locks.
for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) {
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
if (lock == null) {
return false;
......
......@@ -21,15 +21,12 @@ package org.apache.druid.indexing.common.task;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentLockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentLockTryAcquireAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
......@@ -183,7 +180,6 @@ public class TaskLockHelper
private boolean tryLockSegments(TaskActionClient actionClient, List<DataSegment> segments) throws IOException
{
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
final Closer lockCloserOnError = Closer.create();
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segmentsInInterval = entry.getValue();
......@@ -206,14 +202,7 @@ public class TaskLockHelper
)
);
lockResults.stream()
.filter(LockResult::isOk)
.map(result -> (SegmentLock) result.getTaskLock())
.forEach(segmentLock -> lockCloserOnError.register(() -> actionClient.submit(
new SegmentLockReleaseAction(segmentLock.getInterval(), segmentLock.getPartitionId())
)));
if (lockResults.stream().anyMatch(result -> !result.isOk())) {
lockCloserOnError.close();
return false;
}
lockedExistingSegments.addAll(segmentsInInterval);
......
......@@ -775,6 +775,25 @@ public class TaskLockbox
}
}
public void unlockAll(Task task)
{
giant.lock();
try {
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(
task,
taskLockPosse.getTaskLock().getInterval(),
taskLockPosse.getTaskLock().getGranularity() == LockGranularity.SEGMENT
? ((SegmentLock) taskLockPosse.taskLock).getPartitionId()
: null
);
}
}
finally {
giant.unlock();
}
}
public void add(Task task)
{
giant.lock();
......@@ -798,15 +817,7 @@ public class TaskLockbox
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(
task,
taskLockPosse.getTaskLock().getInterval(),
taskLockPosse.getTaskLock().getGranularity() == LockGranularity.SEGMENT
? ((SegmentLock) taskLockPosse.taskLock).getPartitionId()
: null
);
}
unlockAll(task);
}
finally {
activeTasks.remove(task.getId());
......
......@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
......@@ -128,6 +129,12 @@ public class TaskQueue
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
}
@VisibleForTesting
void setActive(boolean active)
{
this.active = active;
}
/**
* Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
*/
......@@ -242,65 +249,7 @@ public class TaskQueue
giant.lock();
try {
// Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {
runnerTaskFuture = runnerTaskFutures.get(task.getId());
} else {
// Task should be running, so run it.
final boolean taskIsReady;
try {
taskIsReady = task.isReady(taskActionClientFactory.create(task));
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
continue;
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
} else if (isTaskPending(task)) {
// if the taskFutures contain this task and this task is pending, also let the taskRunner
// to run it to guarantee it will be assigned to run
// see https://github.com/apache/druid/pull/6991
taskRunner.run(task);
}
}
// Kill tasks that shouldn't be running
final Set<String> knownTaskIds = tasks
.stream()
.map(Task::getId)
.collect(Collectors.toSet());
final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
for (final String taskId : tasksToKill) {
try {
taskRunner.shutdown(
taskId,
"task is not in knownTaskIds[%s]",
knownTaskIds
);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
}
}
manageInternal();
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
......@@ -311,6 +260,73 @@ public class TaskQueue
}
}
@VisibleForTesting
void manageInternal()
{
// Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {
runnerTaskFuture = runnerTaskFutures.get(task.getId());
} else {
// Task should be running, so run it.
final boolean taskIsReady;
try {
taskIsReady = task.isReady(taskActionClientFactory.create(task));
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
// Task.isReady() can internally lock intervals or segments.
// We should release them if the task is not ready.
taskLockbox.unlockAll(task);
continue;
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
} else if (isTaskPending(task)) {
// if the taskFutures contain this task and this task is pending, also let the taskRunner
// to run it to guarantee it will be assigned to run
// see https://github.com/apache/druid/pull/6991
taskRunner.run(task);
}
}
// Kill tasks that shouldn't be running
final Set<String> knownTaskIds = tasks
.stream()
.map(Task::getId)
.collect(Collectors.toSet());
final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
for (final String taskId : tasksToKill) {
try {
taskRunner.shutdown(
taskId,
"task is not in knownTaskIds[%s]",
knownTaskIds
);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
}
}
}
private boolean isTaskPending(Task task)
{
return taskRunner.getPendingTasks()
......
......@@ -32,6 +32,8 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
......@@ -127,6 +129,11 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
temporaryFolder.delete();
}
public TestLocalTaskActionClientFactory createActionClientFactory()
{
return new TestLocalTaskActionClientFactory();
}
public TestLocalTaskActionClient createActionClient(Task task)
{
return new TestLocalTaskActionClient(task);
......@@ -210,6 +217,15 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
return testUtils.getTestIndexMergerV9();
}
public class TestLocalTaskActionClientFactory implements TaskActionClientFactory
{
@Override
public TaskActionClient create(Task task)
{
return new TestLocalTaskActionClient(task);
}
}
public class TestLocalTaskActionClient extends CountingLocalTaskActionClientForTest
{
private final Set<DataSegment> publishedSegments = new HashSet<>();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
public class TaskQueueTest extends IngestionTestBase
{
private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;
/**
* This test verifies releasing all locks of a task when it is not ready to run yet.
*
* This test uses 2 APIs, {@link TaskQueue} APIs and {@link IngestionTestBase} APIs
* to emulate the scenario of deadlock. The IngestionTestBase provides low-leve APIs
* which you can manipulate {@link TaskLockbox} manually. These APIs should be used
* only to emulate a certain deadlock scenario. All normal tasks should use TaskQueue
* APIs.
*/
@Test
public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
// task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting
// to task2.
final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
// Manually get locks for task1. task2 cannot be ready because of task1.
prepareTaskForLocking(task1);
Assert.assertTrue(task1.isReady(actionClientFactory.create(task1)));
final TestTask task2 = new TestTask("t2", Intervals.of("2021-01-31/P1M"));
taskQueue.add(task2);
taskQueue.manageInternal();
Assert.assertFalse(task2.isDone());
Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty());
// task3 can run because task2 is still blocked by task1.
final TestTask task3 = new TestTask("t3", Intervals.of("2021-02-01/P1M"));
taskQueue.add(task3);
taskQueue.manageInternal();
Assert.assertFalse(task2.isDone());
Assert.assertTrue(task3.isDone());
Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty());
// Shut down task1 and task3 and release their locks.
shutdownTask(task1);
taskQueue.shutdown(task3.getId(), "Emulating shutdown of task3");
// Now task2 should run.
taskQueue.manageInternal();
Assert.assertTrue(task2.isDone());
}
private static class TestTask extends AbstractBatchIndexTask
{
private final Interval interval;
private boolean done;
private TestTask(String id, Interval interval)
{
super(id, "datasource", null);
this.interval = interval;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(taskActionClient, ImmutableList.of(interval));
}
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
done = true;
return TaskStatus.success(getId());
}
@Override
public boolean requireLockExistingSegments()
{
return false;
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
{
return null;
}
@Override
public boolean isPerfectRollup()
{
return false;
}
@Nullable
@Override
public Granularity getSegmentGranularity()
{
return SEGMENT_GRANULARITY;
}
@Override
public String getType()
{
return "test";
}
public boolean isDone()
{
return done;
}
}
private static class SimpleTaskRunner implements TaskRunner
{
private final TaskActionClientFactory actionClientFactory;
private SimpleTaskRunner(TaskActionClientFactory actionClientFactory)
{
this.actionClientFactory = actionClientFactory;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return null;
}
@Override
public void start()
{
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
}
@Override
public void unregisterListener(String listenerId)
{
}
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
try {
final TaskToolbox toolbox = Mockito.mock(TaskToolbox.class);
Mockito.when(toolbox.getTaskActionClient()).thenReturn(actionClientFactory.create(task));
return Futures.immediateFuture(task.run(toolbox));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void shutdown(String taskid, String reason)
{
}
@Override
public void stop()
{
}
@Override
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
return null;
}
@Override
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
return null;
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
return Collections.emptyList();
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return null;
}
@Override
public long getTotalTaskSlotCount()
{
return 0;
}
@Override
public long getIdleTaskSlotCount()
{
return 0;
}
@Override
public long getUsedTaskSlotCount()
{
return 0;
}
@Override
public long getLazyTaskSlotCount()
{
return 0;
}
@Override
public long getBlacklistedTaskSlotCount()
{
return 0;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册