diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index 41ca280d200e7c890b35de59efe6176a607a8121..7a591862d11f39c2bc6a140ecb60b7a80094ef89 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -57,7 +57,16 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private static Comparator segmentHolderComparator = Comparators.inverse(DataSegment.bucketMonthComparator()); + private static Comparator segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator()); + + private static void executeCallbacks(List callbacks) + { + for (LoadPeonCallback callback : callbacks) { + if (callback != null) { + callback.execute(); + } + } + } private final CuratorFramework curator; private final String basePath; @@ -70,10 +79,10 @@ public class LoadQueuePeon private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( - segmentHolderComparator + segmentComparator ); private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( - segmentHolderComparator + segmentComparator ); private final Object lock = new Object(); @@ -120,8 +129,8 @@ public class LoadQueuePeon } public void loadSegment( - DataSegment segment, - LoadPeonCallback callback + final DataSegment segment, + final LoadPeonCallback callback ) { synchronized (lock) { @@ -134,8 +143,6 @@ public class LoadQueuePeon } } - final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback)); - synchronized (lock) { final SegmentHolder existingHolder = segmentsToLoad.get(segment); if (existingHolder != null) { @@ -148,13 +155,13 @@ public class LoadQueuePeon log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); - segmentsToLoad.put(segment, holder); + segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback))); doNext(); } public void dropSegment( - DataSegment segment, - LoadPeonCallback callback + final DataSegment segment, + final LoadPeonCallback callback ) { synchronized (lock) { @@ -167,8 +174,6 @@ public class LoadQueuePeon } } - SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback)); - synchronized (lock) { final SegmentHolder existingHolder = segmentsToDrop.get(segment); if (existingHolder != null) { @@ -180,7 +185,7 @@ public class LoadQueuePeon } log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); - segmentsToDrop.put(segment, holder); + segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback))); doNext(); } @@ -305,14 +310,15 @@ public class LoadQueuePeon throw new UnsupportedOperationException(); } + final List callbacks = currentlyProcessing.getCallbacks(); + currentlyProcessing = null; callBackExecutor.execute( new Runnable() { @Override public void run() { - currentlyProcessing.executeCallbacks(); - currentlyProcessing = null; + executeCallbacks(callbacks); } } ); @@ -323,20 +329,20 @@ public class LoadQueuePeon { synchronized (lock) { if (currentlyProcessing != null) { - currentlyProcessing.executeCallbacks(); + executeCallbacks(currentlyProcessing.getCallbacks()); currentlyProcessing = null; } if (!segmentsToDrop.isEmpty()) { for (SegmentHolder holder : segmentsToDrop.values()) { - holder.executeCallbacks(); + executeCallbacks(holder.getCallbacks()); } } segmentsToDrop.clear(); if (!segmentsToLoad.isEmpty()) { for (SegmentHolder holder : segmentsToLoad.values()) { - holder.executeCallbacks(); + executeCallbacks(holder.getCallbacks()); } } segmentsToLoad.clear(); @@ -433,15 +439,10 @@ public class LoadQueuePeon } } - public void executeCallbacks() + public List getCallbacks() { synchronized (callbacks) { - for (LoadPeonCallback callback : callbacks) { - if (callback != null) { - callback.execute(); - } - } - callbacks.clear(); + return callbacks; } }