diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index f0e7355a1c6a9934c6ba47643e1028b66eaa19b0..5424a8af8ad4bde0b8c59581010500abba6e0706 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -253,7 +253,7 @@ public class DruidClusterBridge log.warn(e, "Unable to close leaderLatch, ignoring"); } - exec.shutdownNow(); + exec.shutdown(); started = false; } diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index b970f17778a0ee72850caacbaccc33b7a60b2db1..59f13d7436323861b25dad3df8f9f954bfe7ce12 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -148,8 +148,12 @@ public class DruidClusterBridgeTest 0 ); DbSegmentPublisher dbSegmentPublisher = EasyMock.createMock(DbSegmentPublisher.class); + EasyMock.replay(dbSegmentPublisher); DatabaseSegmentManager databaseSegmentManager = EasyMock.createMock(DatabaseSegmentManager.class); + EasyMock.replay(databaseSegmentManager); ServerView serverView = EasyMock.createMock(ServerView.class); + EasyMock.replay(serverView); + BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator( jsonMapper, zkPathsConfig, @@ -163,9 +167,9 @@ public class DruidClusterBridgeTest Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); announcer.start(); announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me)); + BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); - EasyMock.expect(batchServerInventoryView.getInventory()).andReturn( Arrays.asList( new DruidServer("1", "localhost", 117, "historical", DruidServer.DEFAULT_TIER, 0), @@ -187,7 +191,6 @@ public class DruidClusterBridgeTest EasyMock.expectLastCall(); EasyMock.replay(batchServerInventoryView); - DruidClusterBridge bridge = new DruidClusterBridge( jsonMapper, config, @@ -224,21 +227,40 @@ public class DruidClusterBridgeTest retry++; } - DruidServerMetadata announced = jsonMapper.readValue( - remoteCf.getData().forPath(path), - DruidServerMetadata.class - ); + boolean verified = verifyUpdate(jsonMapper, path, remoteCf); + retry = 0; + while (!verified) { + if (retry > 5) { + throw new ISE("No updates to bridge node occurred"); + } - Assert.assertEquals(118, announced.getMaxSize()); + Thread.sleep(100); + retry++; - bridge.stop(); - EasyMock.verify(batchServerInventoryView); + verified = verifyUpdate(jsonMapper, path, remoteCf); + } announcer.stop(); + bridge.stop(); remoteCf.close(); remoteCluster.close(); localCf.close(); localCluster.close(); + + EasyMock.verify(batchServerInventoryView); + EasyMock.verify(dbSegmentPublisher); + EasyMock.verify(databaseSegmentManager); + EasyMock.verify(serverView); + } + + private boolean verifyUpdate(ObjectMapper jsonMapper, String path, CuratorFramework remoteCf) throws Exception + { + DruidServerMetadata announced = jsonMapper.readValue( + remoteCf.getData().forPath(path), + DruidServerMetadata.class + ); + + return (118 == announced.getMaxSize()); } }