From ad94cc7727b4ec3ea35fb45cc58e2d5dd1950a8a Mon Sep 17 00:00:00 2001 From: Jason Song Date: Sun, 22 Jan 2017 17:43:06 +0800 Subject: [PATCH] do async notification if there are too many clients to notify --- .../apollo/biz/config/BizConfig.java | 7 +++ .../controller/NotificationControllerV2.java | 36 +++++++++++++ .../NotificationControllerV2Test.java | 52 +++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java b/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java index 835f67949..d8569b95a 100644 --- a/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java +++ b/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java @@ -105,4 +105,11 @@ public class BizConfig extends RefreshableConfig { return TimeUnit.SECONDS; } + public int releaseMessageNotificationBatch() { + return getIntProperty("apollo.release-message.notification.batch", 100); + } + + public int releaseMessageNotificationBatchIntervalInMilli() { + return getIntProperty("apollo.release-message.notification.batch.interval", 100); + } } diff --git a/apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java b/apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java index 55bc9201b..5a9f9b267 100644 --- a/apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java +++ b/apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java @@ -12,6 +12,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import com.ctrip.framework.apollo.biz.config.BizConfig; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener; import com.ctrip.framework.apollo.biz.message.Topics; @@ -22,6 +23,7 @@ import com.ctrip.framework.apollo.configservice.util.NamespaceUtil; import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil; import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification; +import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory; import com.ctrip.framework.apollo.tracer.Tracer; import org.slf4j.Logger; @@ -41,6 +43,9 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * @author Jason Song(song_s@ctrip.com) @@ -61,6 +66,8 @@ public class NotificationControllerV2 implements ReleaseMessageListener { new TypeToken>() { }.getType(); + private final ExecutorService largeNotificationBatchExecutorService; + @Autowired private WatchKeysUtil watchKeysUtil; @@ -76,6 +83,14 @@ public class NotificationControllerV2 implements ReleaseMessageListener { @Autowired private Gson gson; + @Autowired + private BizConfig bizConfig; + + public NotificationControllerV2() { + largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create + ("NotificationControllerV2", true)); + } + @RequestMapping(method = RequestMethod.GET) public DeferredResult>> pollNotification( @RequestParam(value = "appId") String appId, @@ -220,6 +235,27 @@ public class NotificationControllerV2 implements ReleaseMessageListener { //create a new list to avoid ConcurrentModificationException List>>> results = Lists.newArrayList(deferredResults.get(content)); + + //do async notification if too many clients + if (results.size() > bizConfig.releaseMessageNotificationBatch()) { + largeNotificationBatchExecutorService.submit(() -> { + logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content, + bizConfig.releaseMessageNotificationBatch()); + for (int i = 0; i < results.size(); i++) { + if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) { + try { + TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli()); + } catch (InterruptedException e) { + //ignore + } + } + logger.debug("Async notify {}", results.get(i)); + results.get(i).setResult(notification); + } + }); + return; + } + logger.debug("Notify {} clients for key {}", results.size(), content); for (DeferredResult>> result : results) { diff --git a/apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java b/apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java index c05af8be8..d0b7d7da2 100644 --- a/apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java +++ b/apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java @@ -7,6 +7,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.gson.Gson; +import com.ctrip.framework.apollo.biz.config.BizConfig; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; import com.ctrip.framework.apollo.biz.message.Topics; import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil; @@ -27,6 +28,7 @@ import org.springframework.test.util.ReflectionTestUtils; import org.springframework.web.context.request.async.DeferredResult; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -57,6 +59,9 @@ public class NotificationControllerV2Test { private NamespaceUtil namespaceUtil; @Mock private WatchKeysUtil watchKeysUtil; + @Mock + private BizConfig bizConfig; + private Gson gson; private Multimap>>> @@ -66,11 +71,16 @@ public class NotificationControllerV2Test { public void setUp() throws Exception { controller = new NotificationControllerV2(); gson = new Gson(); + + when(bizConfig.releaseMessageNotificationBatch()).thenReturn(100); + when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(5); + ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService); ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil); ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil); ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil); ReflectionTestUtils.setField(controller, "gson", gson); + ReflectionTestUtils.setField(controller, "bizConfig", bizConfig); someAppId = "someAppId"; someCluster = "someCluster"; @@ -283,6 +293,48 @@ public class NotificationControllerV2Test { assertEquals(someId, notification.getNotificationId()); } + @Test + public void testPollNotificationWithHandleMessageInBatch() throws Exception { + String someWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR) + .join(someAppId, someCluster, defaultNamespace); + int someBatch = 1; + int someBatchInterval = 10; + + Multimap watchKeysMap = + assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey)); + + String notificationAsString = + transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId); + + when(watchKeysUtil + .assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace), + someDataCenter)).thenReturn(watchKeysMap); + + when(bizConfig.releaseMessageNotificationBatch()).thenReturn(someBatch); + when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(someBatchInterval); + + DeferredResult>> + deferredResult = controller + .pollNotification(someAppId, someCluster, notificationAsString, someDataCenter, + someClientIp); + DeferredResult>> + anotherDeferredResult = controller + .pollNotification(someAppId, someCluster, notificationAsString, someDataCenter, + someClientIp); + + long someId = 1; + ReleaseMessage someReleaseMessage = new ReleaseMessage(someWatchKey); + someReleaseMessage.setId(someId); + + controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC); + + assertTrue(!anotherDeferredResult.hasResult()); + + TimeUnit.MILLISECONDS.sleep(someBatchInterval * 3); + + assertTrue(anotherDeferredResult.hasResult()); + } + private String transformApolloConfigNotificationsToString( String namespace, long notificationId) { List notifications = -- GitLab